`
yang_min
  • 浏览: 338175 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

java(多线程)实现高性能数据同步

阅读更多

 需要将生产环境上Infoxmix里的数据原封不动的Copy到另一台 Oracle数据库服务器上,然后对Copy后的数据作些漂白处理。为了将人为干预的因素降到最低,在系统设计时采用Java代码对数据作Copy,思路 如图: 



    首 先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从 ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy 一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多 线程处理,多个人干活总比一个人干活速度要快。
    假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这 个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用 java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可 以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。

在查询的时候我用了如下语句

 

String queryStr = "SELECT * FROM xx";
ResultSet coreRs = PreparedStatement.executeQuery(queryStr);

 

 

实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet, 那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它 会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整 Fetch的数量。另外性能也与网线的带宽有直接的关系。
相关代码

 

 

package com.dlbank.domain;  
  
import java.sql.Connection;  
import java.sql.PreparedStatement;  
import java.sql.ResultSet;  
import java.sql.Statement;  
import java.util.List;  
import java.util.concurrent.atomic.AtomicLong;  
  
import org.apache.log4j.Logger;  
  
/** 
 *<p>title: 数据同步类 </p>   
 *<p>Description: 该类用于将生产核心库数据同步到开发库</p>   
 *@author Tank Zhang  
 */  
public class CoreDataSyncImpl implements CoreDataSync {  
      
    private List<String> coreTBNames; //要同步的核心库表名  
    private ConnectionFactory connectionFactory;  
    private Logger log = Logger.getLogger(getClass());  
      
    private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数  
      
    private int syncThreadNum;  //同步的线程数  
  
    @Override  
    public void syncData(int businessType) throws Exception {  
          
        for (String tmpTBName : coreTBNames) {  
            log.info("开始同步核心库" + tmpTBName + "表数据");  
            // 获得核心库连接  
            Connection coreConnection = connectionFactory.getDMSConnection(4);  
            Statement coreStmt = coreConnection.createStatement();  
            //为每个线程分配结果集  
            ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
            coreRs.next();  
            //总共处理的数量  
            long totalNum = coreRs.getLong(1);  
            //每个线程处理的数量  
            long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
            log.info("共需要同步的数据量:"+totalNum);  
            log.info("同步线程数量:"+syncThreadNum);  
            log.info("每个线程可处理的数量:"+ownerRecordNum);  
            // 开启五个线程向目标库同步数据  
            for(int i=0; i < syncThreadNum; i ++){  
                StringBuilder sqlBuilder = new StringBuilder();  
                //拼装后SQL示例  
                //Select * From dms_core_ds Where id between 1 And 657398  
                //Select * From dms_core_ds Where id between 657399 And 1314796  
                //Select * From dms_core_ds Where id between 1314797 And 1972194  
                //Select * From dms_core_ds Where id between 1972195 And 2629592  
                //Select * From dms_core_ds Where id between 2629593 And 3286990  
                //..  
                sqlBuilder.append("Select * From ").append(tmpTBName)  
                        .append(" Where id between " ).append(i * ownerRecordNum +1)  
                        .append( " And ")  
                        .append((i * ownerRecordNum + ownerRecordNum));  
                Thread workThread = new Thread(  
                        new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
                workThread.setName("SyncThread-"+i);  
                workThread.start();  
            }  
            while (currentSynCount.get() < totalNum);  
            //休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,
			//因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);  
            //Thread.sleep(1000 * 3);  
            log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");  
        }  
    }// end for loop  
      
    public void setCoreTBNames(List<String> coreTBNames) {  
        this.coreTBNames = coreTBNames;  
    }  
  
    public void setConnectionFactory(ConnectionFactory connectionFactory) {  
        this.connectionFactory = connectionFactory;  
    }  
      
    public void setSyncThreadNum(int syncThreadNum) {  
        this.syncThreadNum = syncThreadNum;  
    }  
      
    //数据同步线程  
    final class WorkerHandler implements Runnable {  
        ResultSet coreRs;  
        String queryStr;  
        int businessType;  
        String targetTBName;  
        public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
            this.queryStr = queryStr;  
            this.businessType = businessType;  
            this.targetTBName = targetTBName;  
        }  
        @Override  
        public void run() {  
            try {  
                //开始同步  
                launchSyncData();  
            } catch(Exception e){  
                log.error(e);  
                e.printStackTrace();  
            }  
        }  
        //同步数据方法  
        void launchSyncData() throws Exception{  
            // 获得核心库连接  
            Connection coreConnection = connectionFactory.getDMSConnection(4);  
            Statement coreStmt = coreConnection.createStatement();  
            // 获得目标库连接  
            Connection targetConn = connectionFactory.getDMSConnection(businessType);  
            targetConn.setAutoCommit(false);// 设置手动提交  
            PreparedStatement targetPstmt =
			 targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
            ResultSet coreRs = coreStmt.executeQuery(queryStr);  
            log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);  
            int batchCounter = 0; //累加的批处理数量  
            while (coreRs.next()) {  
                targetPstmt.setString(1, coreRs.getString(2));  
                targetPstmt.setString(2, coreRs.getString(3));  
                targetPstmt.setString(3, coreRs.getString(4));  
                targetPstmt.setString(4, coreRs.getString(5));  
                targetPstmt.setString(5, coreRs.getString(6));  
                targetPstmt.addBatch();  
                batchCounter++;  
                currentSynCount.incrementAndGet();//递增  
                if (batchCounter % 10000 == 0) { //1万条数据一提交  
                    targetPstmt.executeBatch();  
                    targetPstmt.clearBatch();  
                    targetConn.commit();  
                }  
            }  
            //提交剩余的批处理  
            targetPstmt.executeBatch();  
            targetPstmt.clearBatch();  
            targetConn.commit();  
            //释放连接   
            connectionFactory.release(targetConn, targetPstmt,coreRs);  
        }  
    }  
}  

 

分享到:
评论
3 楼 heipacker 2014-05-15  
建议:java(多线程)实现高性能数据同步==>java(多线程)实现数据拷贝
2 楼 qifan.yang 2013-09-13  
文章线程分配任务有个问题,可能最后那个线程只分配了很少的任务,
1 楼 qifan.yang 2013-09-13  
不错不错,赞一个

相关推荐

    java 多线程同步

    java.util.concurrent 包含许多线程安全、测试良好、高性能的并发构建块。不客气地说,创建 java.util.concurrent 的目的就是要实现 Collection 框架对数据结构所执行的并发操作。通过提供一组可靠的、高性能并发...

    NIO框架Netty实现高性能高并发

    Java异步NIO框架Netty实现高性能高并发无标题笔记 1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨 节点...

    java高性能数据库连接池V5.0

    保证了连接池在真多线程上同步访问的安全性。 里面包含了一个公开的接口。使用这个接口里面的函数。可以轻易创建使用数据库连接池服务。 使用一个守护线程维护这个连接池,完全自动化。 版本说明:V5.0 。在网站...

    多线程操作实例源码,,

    浏览器就是一个很好的多线程的例子,在浏览器中你可以在下载JAVA小应用程序或图象的同时滚动页面,在访问新页面时,播放动画和声音,打印文件等。  多线程的好处在于可以提高CPU的利用率——任何一个程序员都不希望...

    多线程操作实例源码

    浏览器就是一个很好的多线程的例子,在浏览器中你可以在下载JAVA小应用程序或图象的同时滚动页面,在访问新页面时,播放动画和声音,打印文件等。  多线程的好处在于可以提高CPU的利用率——任何一个程序员都不希望...

    java开源包4

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    JAVA上百实例源码以及开源项目源代码

     Java实现的FTP连接与数据浏览程序,实现实例化可操作的窗口。  部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText(); //得到服务器地址  ...

    java高并发相关知识点.docx

    线程:Java多线程的实现方式,包括继承Thread类和实现Runnable接口。 锁:Java中的锁机制,包括synchronized关键字和ReentrantLock类。 线程池:Java中的线程池机制,包括线程池的创建、执行任务、关闭等操作。 并发...

    JAVA CAS实现原理与使用.docx

    (1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。 (2)一个线程持有锁会导致其它所有需要此锁的线程挂起。 (3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致...

    JUC多线程学习个人笔记

    JUC(Java Util Concurrent)是Java中用于并发编程的工具包,提供了一组接口和类,用于处理多线程和并发操作。JUC提供了一些常用的并发编程模式和工具,如线程池、并发集合、原子操作等。 JUC的主要特点包括: ...

    java开源包3

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    Java并发编程相关源码集 包括多任务线程,线程池等.rar

    多个线程竞争问题、多个线程多个锁问题、创建一个缓存的线程池、多线程使用Vector或者HashTable的示例(简单线程同步问题)、PriorityBlockingQueue示例、高性能无阻塞无界队列: ConcurrentLinkedQueue、DelayQueue...

    java开源包11

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java开源包6

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java并发编程:juc、aqs

    Java 并发编程中的 JUC(java.util.concurrent)库以及其核心组件 AQS(AbstractQueuedSynchronizer)在构建高性能、可伸缩性的多线程应用方面具有重要的地位。 AQS 是 JUC 中的核心组件,它提供了一个框架,让...

    java开源包9

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java开源包101

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java开源包5

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java开源包8

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java开源包10

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

Global site tag (gtag.js) - Google Analytics