博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JAVA多线程插入MYSQL数据库
阅读量:4182 次
发布时间:2019-05-26

本文共 8620 字,大约阅读时间需要 28 分钟。

一、十个线程依次插入200个表

package com.xiaobu.base.util;import com.google.common.util.concurrent.ThreadFactoryBuilder;import lombok.extern.slf4j.Slf4j;import java.sql.Connection;import java.sql.PreparedStatement;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicLong;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2020/12/8 9:07 * @description */@Slf4jpublic class MultiTableInport {
private static final String TABLE_PRE = "random_code_"; private static final Long COUNT=50000000L; private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%s").build(); public static void multiTableInport(){
for (int index = 1; index < 201; index++) {
long atomic=(index-1)*COUNT; AtomicLong atomicIndex = new AtomicLong(atomic); String tableName = TABLE_PRE + index; multiThreadImport(10,tableName,atomicIndex); } } public static void multiThreadImport(final int threadNum,String tableName,AtomicLong atomicIndex) {
final CountDownLatch cdl = new CountDownLatch(threadNum); long starttime = System.currentTimeMillis(); ThreadPoolExecutor executor = new ThreadPoolExecutor(threadNum, 15, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50000), threadFactory); for (int k = 1; k <= threadNum; k++) {
executor.execute(() -> {
Connection conn = JDBCUtils.getConnection(); PreparedStatement preparedStatement = null; try {
assert conn != null; conn.setAutoCommit(false); String sql = "INSERT INTO " + tableName + "(id) values(?)"; preparedStatement = conn.prepareStatement(sql); for (long i = 1; i <= COUNT / threadNum; i++) {
long id = atomicIndex.incrementAndGet(); preparedStatement.setLong(1, id); preparedStatement.addBatch(); if (i % 10000 == 0) {
preparedStatement.executeBatch(); conn.commit(); preparedStatement.clearBatch(); } } preparedStatement.executeBatch(); preparedStatement.clearBatch(); conn.commit(); preparedStatement.close(); } catch (Exception e) {
e.printStackTrace(); } finally {
cdl.countDown(); JDBCUtils.closeStatement(preparedStatement); JDBCUtils.closeConnection(conn); } }); } try {
cdl.await(); long spendtime = System.currentTimeMillis() - starttime; System.out.println(threadNum + "个线程花费时间:" + spendtime/1000+"S"); log.info("{}个线程花费时间:{}S" ,threadNum , spendtime/1000); } catch (InterruptedException e) {
e.printStackTrace(); } executor.shutdown(); } public static void main(String[] args) {
multiTableInport(); }}
drop TABLE random_code_1;CREATE TABLE `random_code_1` (  `id` bigint(20) NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

二、两个线程分批插入四十个表 五个库

/**     *      * @param threadNum 线程个数     * @param index 表名下标     */    public void insertDatabase1(int threadNum, int index) {
AtomicInteger atomicIndex = new AtomicInteger(index); long indexNum = (index - 1) * COUNT + 1; AtomicLong atomicI = new AtomicLong(indexNum); AtomicLong atomicId = new AtomicLong(indexNum); final CountDownLatch cdl = new CountDownLatch(threadNum); long startTime = System.currentTimeMillis(); ThreadPoolExecutor executor = new ThreadPoolExecutor(threadNum, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50000), threadFactory); AtomicLong count = new AtomicLong(0); for (int k = 1; k <= threadNum; k++) {
final List
[] randomCodeList = new List[]{
new ArrayList<>()}; executor.execute(() -> {
try {
String tableName = TABLE_PRE + atomicIndex.get(); for (long i = atomicI.get(); i < atomicI.get() + COUNT / threadNum; i++) {
long id = atomicId.getAndIncrement(); RandomCode randomCode = new RandomCode(); randomCode.setId(id); randomCodeList[0].add(randomCode); if (i % 10000 == 0) {
count.getAndIncrement(); primaryMapper.insertRandomCaodeList(tableName, randomCodeList[0]); randomCodeList[0] = null; randomCodeList[0] = new ArrayList<>(); log.info("往表{}提交第{}次数据", tableName, count); } } } catch (Exception e) {
e.printStackTrace(); } finally {
cdl.countDown(); } }); } try {
cdl.await(); long spendtime = System.currentTimeMillis() - startTime; System.out.println(threadNum + "个线程花费时间:" + spendtime / 1000 + "S"); } catch (InterruptedException e) {
e.printStackTrace(); } executor.shutdown(); }
@Test    public   void insertDatabase1() {
for (int i = 1; i <=40 ; i++) {
randomCodeService.insertDatabase1(2, i); } }
package com.xiaobu.entity.vo;import com.xiaobu.entity.RandomCode;import com.xiaobu.mapper.primary.PrimaryMapper;import lombok.extern.slf4j.Slf4j;import java.util.AbstractCollection;import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicLong;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2020/12/8 13:46 * @description */@Slf4jpublic class PrimaryTask implements Runnable {
/** * 数据访问层 */ private PrimaryMapper primaryMapper; /** * 插入的数据 */ private List
list; /** * 表名 */ private String tabelName; private AtomicLong count; public PrimaryTask(PrimaryMapper primaryMapper, String tabelName, AtomicLong count) {
this.primaryMapper = primaryMapper; this.tabelName = tabelName; this.count = count; } @Override public void run() {
this.list = new ArrayList<>(); AtomicLong insertAcount = new AtomicLong(0); for (long i = this.count.get(); i <= this.count.get() + 50000000 / 2; i++) {
RandomCode randomCode = new RandomCode(); randomCode.setId(this.count.getAndIncrement()); this.list.add(randomCode); if (i % 10000 == 0) {
insertAcount.incrementAndGet(); try {
this.primaryMapper.insertRandomCaodeList(this.tabelName, this.list); log.info("在{}表第{}批次插入成功", this.tabelName, insertAcount.get()); } catch (Exception e) {
log.info("在{}表第{}批次插入失败", this.tabelName, insertAcount.get()); } this.list = null; this.list = new ArrayList<>(); } } }}
public void excuteTask(int threadNum, int index) {
AtomicInteger atomicIndex = new AtomicInteger(index); final CountDownLatch cdl = new CountDownLatch(threadNum); long indexNum = (index - 1) * COUNT + 1; AtomicLong atomicId = new AtomicLong(indexNum); long startTime = System.currentTimeMillis(); ThreadPoolExecutor executor = new ThreadPoolExecutor(threadNum, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50000), threadFactory); for (int k = 1; k <= threadNum; k++) {
String tableName = TABLE_PRE + atomicIndex.get(); PrimaryTask primaryTask = new PrimaryTask(primaryMapper, tableName,atomicId); executor.execute(primaryTask); } try {
cdl.await(); long spendtime = System.currentTimeMillis() - startTime; System.out.println(threadNum + "个线程花费时间:" + spendtime / 1000 + "S"); } catch (InterruptedException e) {
e.printStackTrace(); } executor.shutdown(); }
@Test    public void excuteTask() {
randomCodeService.excuteTask(2, 1); }

转载地址:http://ozgai.baihongyu.com/

你可能感兴趣的文章
Git命令还原未提交的变更
查看>>
Linux系统中环境变量的配置
查看>>
Linux系统中配置脚本程序开机启动
查看>>
让Linux系统上的nginx支持php程序
查看>>
源码编译安装LNMP环境之Nginx篇
查看>>
源码编译安装LNMP环境之PHP篇
查看>>
Linux中rpm工具使用教程
查看>>
Linux中yum工具使用教程
查看>>
C++字符串函数
查看>>
mknod详解
查看>>
linux中的run-level何解?
查看>>
Linux内核编译详解(转自linuxSir)
查看>>
实模式,保护模式与V86模式
查看>>
628. Maximum Product of Three Numbers(排序)
查看>>
Linux内核-------同步机制(二)
查看>>
面试题31-------连续子数组的最大和(数组)
查看>>
epoll 实现Chat
查看>>
21. Merge Two Sorted Lists(链表)
查看>>
2. Add Two Numbers(链表)
查看>>
637. Average of Levels in Binary Tree(Tree)
查看>>