背景:
通过tomcat日志,解析请求的json串,并写入数据库。应用总4台,一台服务器一个月日志大概在20G左右,我把日志拆分为2G一个txt,10个文件,不拆分的话,用FileInputStream + Scanner,读取20个G的txt估计得跑十多天吧。
在使用Scanner读取日志日志一开始很快,到后面IO消耗大了以后跟蜗牛一样,后面改用BufferedReader放内存里面再读,效率提升100倍,一点儿都不夸张。
使用Scanner代码
try {
Long start = System.currentTimeMillis();
//打开文件创建数据流
FileInputStream inputStream = null;
Scanner sc = null;
inputStream = new FileInputStream("G:\\满意度数据恢复\\catalina_202211b\\catalina_202211b_01.txt");
sc = new Scanner(inputStream, "UTF-8");
String str; //定义String变量用来保存每一次读到的每一行的数据
List<TsatSampleHorizontalLog> list = new ArrayList<TsatSampleHorizontalLog>();
// CountDownLatch countDownLatch = new CountDownLatch(reader.getLineNumber());
while ( sc.hasNextLine()) {
String a = sc.nextLine();
if (a.contains("data={\"imageCode\"") && !a.contains("smsCode={\"")) {
if (a.length() > 0) {
System.out.println(a);
if (a.indexOf("}") > 1000) {
a = a.substring(a.indexOf("data={\"imageCode\":") + 5, a.indexOf("}") +1);
}
System.out.println(a);
TsatSampleHorizontalLog satSampleHorizontalLog = JSON.parseObject(a, TsatSampleHorizontalLog.class);
list.add(satSampleHorizontalLog);
}
}
}
System.out.println(list.size());
Long end = System.currentTimeMillis();
System.out.println("----------------数据拉取耗时:-------------" + (end - start));
//将List才分为1000条一个List,并创建一个线程
List<List<TsatSampleHorizontalLog>> lists = Lists.partition(list,1000);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
Long startInsert = System.currentTimeMillis();
for(List<TsatSampleHorizontalLog> sub: lists) {
asyncService.executeAsync(sub,countDownLatch);
}
countDownLatch.await();
Long endInser = System.currentTimeMillis();
System.out.println("----------------数据插入结束-耗时:-------------" + (endInser - startInsert));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
使用BufferedReader代码
try {
Long start = System.currentTimeMillis();
//打开文件创建数据流
BufferedReader in = new BufferedReader(new FileReader("G:\\满意度数据恢复\\catalina_202211b\\catalina_202211b_02.txt"));
// LineNumberReader reader = new LineNumberReader(in);
String str; //定义String变量用来保存每一次读到的每一行的数据
List<TsatSampleHorizontalLog> list = new ArrayList<TsatSampleHorizontalLog>();
// CountDownLatch countDownLatch = new CountDownLatch(reader.getLineNumber());
while ((str = in.readLine()) != null) {
if (str.contains("data={\"imageCode\"") && !str.contains("smsCode={\"")) {
if (str.length() > 0) {
String a = "";
System.out.println(str);
if (str.indexOf("}") > 1000) {
a = str.substring(str.indexOf("data={\"imageCode\":") + 5, str.indexOf("}") +1);
}
System.out.println(a);
TsatSampleHorizontalLog satSampleHorizontalLog = JSON.parseObject(a, TsatSampleHorizontalLog.class);
list.add(satSampleHorizontalLog);
}
}
}
System.out.println(list.size());
Long end = System.currentTimeMillis();
System.out.println("----------------数据拉取耗时:-------------" + (end - start));
// for (TsatSampleHorizontalLog log : list) {
// this.mydRecoverService.insert(log);
// }
//
//将List拆分为1000条一个List,并获取一个线程处理
List<List<TsatSampleHorizontalLog>> lists = Lists.partition(list,1000);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
Long startInsert = System.currentTimeMillis();
for(List<TsatSampleHorizontalLog> sub: lists) {
asyncService.executeAsync(sub,countDownLatch);
}
countDownLatch.await();
Long endInser = System.currentTimeMillis();
System.out.println("----------------数据插入结束-耗时:-------------" + (endInser - startInsert));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
使用BufferedReader跑大于1G文件时候,最好把idea内存设置大一些,不然很有可能回报Java heap space内存溢出
-Xms1024m -Xmx1024m -XX:MaxPermSize=1024m
读取文件的效率解决了,接下来批量插入数据库效率问题,利用ThreadPoolTaskExecutor多线程,把List拆分成1000条一个List的集合,达到条数并获取一个线程去执行。
创建线程池初始化
/**
* @author Heruixing
* @version 1.0.0
* @description:初始化线程池
* @ClassName:${class}
* @Date 15:10 2022/12/28
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
//配置核心线程数
private int corePoolSize = 30;
//最大线程大小
private int maxPoolSize = 30;
//查询队列
private int queueCapacity = 999888;
//线程名称
private String namePrefix = "async-importDB-";
@Bean(name = "asyncServiceExcutor")
public Executor asyncServiceExecutor() {
System.out.println("start asyncServiceExecutor!");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix(namePrefix);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
构造线程执行器
@Service
public class AsyncServiceImpl implements AsyncService {
@Autowired
private MydRecoverService mydRecoverService;
@Override
@Async("asyncServiceExcutor")
public void executeAsync(List<TsatSampleHorizontalLog> list, CountDownLatch countDownLatch) {
try {
System.out.println("start executeAsync" + Thread.currentThread().getName());
for (TsatSampleHorizontalLog log : list) {
this.mydRecoverService.insert(log);
}
} finally {
countDownLatch.countDown();
}
}
任务调度代码片段
@Autowired
private AsyncService asyncService;
//将List拆分为1000条一个List,并获取一个线程处理
List<List<TsatSampleHorizontalLog>> lists = Lists.partition(list,1000);
//控制线程协调器,也可以叫计数器
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
Long startInsert = System.currentTimeMillis();
for(List<TsatSampleHorizontalLog> sub: lists) {
//调度线程器执行
asyncService.executeAsync(sub,countDownLatch);
}
countDownLatch.await();
Long endInser = System.currentTimeMillis();
System.out.println("----------------数据插入结束-耗时:-------------" + (endInser - startInsert));
读取2个G日志文件,并清洗数据到入库,大概20W行数据,耗时6分钟左右
