Mysql海量資料遷移

NO IMAGE

問題

專案中遇到對錶資料全量遷移,表資料量比較大,基本上是在百萬、千萬級別。

方案

limit方式

首先最開始的方案就是簡單粗暴最直接的Limit查詢,這種方案對少量資料是可行的,但是到後面資料量大的時候
再進行limit x,y 這個時候效率會很低,會執行全表掃描,例如

select * from table limit 150000,1000;
  • 優點

    • 實現邏輯簡單
  • 缺點

    • limit資料量大的時候效率低

索引方式

此方式是直接獲取的滿足條件的最小和最大ID(主鍵),然後通過ID區間的批量BETWEEN AND方式來獲取資料

  • 優點

    • 每次查詢都是通過索引ID來獲取資料,執行效率高
  • 缺點

    • 在獲取min(id)和max(id)會執行全表掃描,比較慢
    • id段不連續,會出現不符合條件的空查詢
    public File getData() {
        long min = x;
        long max = y;
        if (min == 0 || max == 0) {
            return null;
        }
        long size = max - min   1;
        int batchNum = 5000;
        int pageNum = size % batchNum == 0 ? (int) (size / batchNum) : (int) (size / batchNum)   1;

        ExecutorService es = Executors.newFixedThreadPool(5);
        Future<Boolean>[] result = new Future[pageNum];

        for (int offset = 0; offset < pageNum; offset  ) {
            Future<Boolean> future = es.submit(new DataFuture(min, batchNum, offset));
            result[i] = future;
        }

        for (Future<Boolean> future : result) {
            try {
                future.get();
            } catch (Exception e) {
                logger.error("callBack error", e);
            }
        }
        es.shutdown();
 }

DataFuture執行分批資料獲取


   class DataFuture implements Callable<Boolean> {
        ResultCallBack<Set<String>> callBack;
        int offset;
        long min;
        int batchNum;

        public DataFuture(long min, int batchNum, int offset) {
            super();
            this.batchNum = batchNum;
            this.offset = offset;
            this.min = min;
        }

        @Override
        public Boolean call() throws Exception {
            long start = offset * batchNum   min;
            long end = start   batchNum - 1;
            // 資料庫執行 select * from table where id BETWEEN start and end
            Set<String> querySet = getData(start, end);
            return true;
        }
    }

ResultSet方式

採用流方式獲取資料

  • 當statement設定以下屬性時,採用的是流資料接收方式,每次只從伺服器接收部份資料,
    直到所有資料處理完畢,不會發生JVM OOM。
  setResultSetType(ResultSet.TYPE_FORWARD_ONLY);
  setFetchSize(Integer.MIN_VALUE); 
  • 呼叫statement的enableStreamingResults方法,實際上enableStreamingResults方法內部封裝的就是第1種方式。

  • 設定連線屬性useCursorFetch=true (5.0版驅動開始支援),statement以TYPE_FORWARD_ONLY開啟,
    再設定fetch size引數,表示採用伺服器端遊標,每次從伺服器取fetch_size條資料。

    public void getSubData(int i, final CallBack<SubData> cb) {

        StringBuffer sql = new StringBuffer();
        sql.append("select * from table");

        Connection con = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            con = subJdbcTemplate.getDataSource().getConnection();
            ps = con.prepareStatement(String.valueOf(sql), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            ps.setFetchSize(Integer.MIN_VALUE);
            ps.setFetchDirection(ResultSet.FETCH_REVERSE);
            rs = ps.executeQuery();

            long batchSize = 1000;
            int currLineNum = 0;
            List<SubData> result = Lists.newArrayList();

            while (rs.next()) {
                currLineNum = currLineNum   1;
                int cp = rs.getInt(1);
                String appId = rs.getString(2);
                String deviceId = rs.getString(3);
                String token = rs.getString(4);
                String alias = rs.getString(5);

                SubData subData = new SubData(appId, cp, appId, deviceId, token, alias);
                if (currLineNum % batchSize != 0) {
                    result.add(subData);
                } else {
                    result.add(subData);
                    List<SubData> copyList = Lists.newArrayList();
                    copyList.addAll(result);
                    result.clear();
                    cb.callBack(copyList);
                }
            }

            if (CollectionUtils.isNotEmpty(result)) {
                List<SubData> copyList = Lists.newArrayList();
                copyList.addAll(result);
                result.clear();
                cb.callBack(copyList);
            }
        } catch (SQLException e) {
            logger.error("getSubData error", e);
        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
            } catch (SQLException e) {
                logger.error("getSubData error", e);
            }
            try {
                if (ps != null) {
                    ps.close();
                }
            } catch (SQLException e) {
                logger.error("getSubData error", e);
            }
            try {
                if (con != null) {
                    con.close();
                }
            } catch (SQLException e) {
                logger.error("getSubData error", e);
            }
        }
    }

PreparedStatement中

    /**
     * We only stream result sets when they are forward-only, read-only, and the
     * 僅當結果集僅為只讀、只讀時,才流出結果集。
     * fetch size has been set to Integer.MIN_VALUE
     * 獲取大小已設定為整數。
     * @return true if this result set should be streamed row at-a-time, rather
     *         than read all at once.
     *  如果此結果集應按時間順序流行,則返回true,而不是一次閱讀。       
     *         
     */
    protected boolean createStreamingResultSet() {
        try {
            synchronized (checkClosed()) {
                return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY)
                        && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE));
            }
        } catch (SQLException e) {
            // we can't break the interface, having this be no-op in case of error is ok
            return false;
        }
    }
  • 優點

    • 流式獲取資料,只需執行一次請求,效率高,不會發生OOM
  • 缺點

    • 流式執行失敗,需要從頭再開始

總結

個人推薦使用ResultSet方式流方式獲取資料