背景:

通过tomcat日志,解析请求的json串,并写入数据库。应用总4台,一台服务器一个月日志大概在20G左右,我把日志拆分为2G一个txt,10个文件,不拆分的话,用FileInputStream + Scanner,读取20个G的txt估计得跑十多天吧。

在使用Scanner读取日志日志一开始很快,到后面IO消耗大了以后跟蜗牛一样,后面改用BufferedReader放内存里面再读,效率提升100倍,一点儿都不夸张。

使用Scanner代码

	 try {
            Long start = System.currentTimeMillis();
            //打开文件创建数据流
            FileInputStream inputStream = null;
            Scanner sc = null;
            inputStream = new FileInputStream("G:\\满意度数据恢复\\catalina_202211b\\catalina_202211b_01.txt");
            sc = new Scanner(inputStream, "UTF-8");
            String str; //定义String变量用来保存每一次读到的每一行的数据
            List<TsatSampleHorizontalLog> list = new ArrayList<TsatSampleHorizontalLog>();

//            CountDownLatch countDownLatch = new CountDownLatch(reader.getLineNumber());
            while ( sc.hasNextLine()) {
                String a = sc.nextLine();
                if (a.contains("data={\"imageCode\"") && !a.contains("smsCode={\"")) {
                    if (a.length() > 0) {
                        System.out.println(a);
                        if (a.indexOf("}") > 1000) {
                            a = a.substring(a.indexOf("data={\"imageCode\":") + 5, a.indexOf("}") +1);
                        }
                        System.out.println(a);
                        TsatSampleHorizontalLog satSampleHorizontalLog = JSON.parseObject(a, TsatSampleHorizontalLog.class);
                        list.add(satSampleHorizontalLog);
                    }
                }

            }
            System.out.println(list.size());
            Long end = System.currentTimeMillis();
            System.out.println("----------------数据拉取耗时:-------------" + (end - start));

//将List才分为1000条一个List,并创建一个线程
        List<List<TsatSampleHorizontalLog>> lists = Lists.partition(list,1000);
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        Long startInsert = System.currentTimeMillis();
        for(List<TsatSampleHorizontalLog> sub: lists) {
            asyncService.executeAsync(sub,countDownLatch);
        }
        countDownLatch.await();
        Long endInser = System.currentTimeMillis();
        System.out.println("----------------数据插入结束-耗时:-------------" + (endInser - startInsert));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

使用BufferedReader代码

try {
            Long start = System.currentTimeMillis();
            //打开文件创建数据流
            BufferedReader in = new BufferedReader(new FileReader("G:\\满意度数据恢复\\catalina_202211b\\catalina_202211b_02.txt"));
//            LineNumberReader reader = new LineNumberReader(in);
            String str; //定义String变量用来保存每一次读到的每一行的数据
            List<TsatSampleHorizontalLog> list = new ArrayList<TsatSampleHorizontalLog>();
//            CountDownLatch countDownLatch = new CountDownLatch(reader.getLineNumber());
            while ((str = in.readLine()) != null) {
                if (str.contains("data={\"imageCode\"") && !str.contains("smsCode={\"")) {
                    if (str.length() > 0) {
                        String a = "";
                        System.out.println(str);
                        if (str.indexOf("}") > 1000) {
                            a = str.substring(str.indexOf("data={\"imageCode\":") + 5, str.indexOf("}") +1);
                        }
                        System.out.println(a);
                        TsatSampleHorizontalLog satSampleHorizontalLog = JSON.parseObject(a, TsatSampleHorizontalLog.class);
                        list.add(satSampleHorizontalLog);
                    }
                }

            }
            System.out.println(list.size());
            Long end = System.currentTimeMillis();
            System.out.println("----------------数据拉取耗时:-------------" + (end - start));

//            for (TsatSampleHorizontalLog log : list) {
//                this.mydRecoverService.insert(log);
//            }
//
//将List拆分为1000条一个List,并获取一个线程处理
        List<List<TsatSampleHorizontalLog>> lists = Lists.partition(list,1000);
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        Long startInsert = System.currentTimeMillis();
        for(List<TsatSampleHorizontalLog> sub: lists) {
            asyncService.executeAsync(sub,countDownLatch);
        }
        countDownLatch.await();
        Long endInser = System.currentTimeMillis();
        System.out.println("----------------数据插入结束-耗时:-------------" + (endInser - startInsert));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

使用BufferedReader跑大于1G文件时候,最好把idea内存设置大一些,不然很有可能回报Java heap space内存溢出

-Xms1024m -Xmx1024m -XX:MaxPermSize=1024m

读取文件的效率解决了,接下来批量插入数据库效率问题,利用ThreadPoolTaskExecutor多线程,把List拆分成1000条一个List的集合,达到条数并获取一个线程去执行。

创建线程池初始化

/**
 * @author Heruixing
 * @version 1.0.0
 * @description:初始化线程池
 * @ClassName:${class}
 * @Date 15:10 2022/12/28
 */

@Configuration
@EnableAsync
public class ExecutorConfig {

    //配置核心线程数
    private int corePoolSize = 30;

    //最大线程大小
    private int maxPoolSize = 30;

    //查询队列
    private int queueCapacity = 999888;

    //线程名称
    private String namePrefix = "async-importDB-";


    @Bean(name = "asyncServiceExcutor")
    public Executor asyncServiceExecutor() {
        System.out.println("start asyncServiceExecutor!");

        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();

        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix(namePrefix);

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;

    }


}

构造线程执行器

@Service
public class AsyncServiceImpl implements AsyncService {


    @Autowired
    private MydRecoverService mydRecoverService;

    @Override
    @Async("asyncServiceExcutor")
    public void executeAsync(List<TsatSampleHorizontalLog> list, CountDownLatch countDownLatch) {
        try {
            System.out.println("start executeAsync" + Thread.currentThread().getName());
            for (TsatSampleHorizontalLog log : list) {
                this.mydRecoverService.insert(log);
            }
        } finally {
            countDownLatch.countDown();
        }
    }

任务调度代码片段

@Autowired

private AsyncService asyncService;
//将List拆分为1000条一个List,并获取一个线程处理
List<List<TsatSampleHorizontalLog>> lists = Lists.partition(list,1000);
//控制线程协调器,也可以叫计数器
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        Long startInsert = System.currentTimeMillis();
        for(List<TsatSampleHorizontalLog> sub: lists) {
	    //调度线程器执行
            asyncService.executeAsync(sub,countDownLatch);
        }
        countDownLatch.await();
        Long endInser = System.currentTimeMillis();
        System.out.println("----------------数据插入结束-耗时:-------------" + (endInser - startInsert));

读取2个G日志文件,并清洗数据到入库,大概20W行数据,耗时6分钟左右

微信图片_20221229223523.png