Mysql海量資料遷移

NO IMAGE
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

問題

專案中遇到對錶資料全量遷移,表資料量比較大,基本上是在百萬、千萬級別。

方案

limit方式

首先最開始的方案就是簡單粗暴最直接的Limit查詢,這種方案對少量資料是可行的,但是到後面資料量大的時候
再進行limit x,y 這個時候效率會很低,會執行全表掃描,例如

select * from table limit 150000,1000;
  • 優點

    • 實現邏輯簡單
  • 缺點

    • limit資料量大的時候效率低

索引方式

此方式是直接獲取的滿足條件的最小和最大ID(主鍵),然後通過ID區間的批量BETWEEN AND方式來獲取資料

  • 優點

    • 每次查詢都是通過索引ID來獲取資料,執行效率高
  • 缺點

    • 在獲取min(id)和max(id)會執行全表掃描,比較慢
    • id段不連續,會出現不符合條件的空查詢
    public File getData() {
long min = x;
long max = y;
if (min == 0 || max == 0) {
return null;
}
long size = max - min   1;
int batchNum = 5000;
int pageNum = size % batchNum == 0 ? (int) (size / batchNum) : (int) (size / batchNum)   1;
ExecutorService es = Executors.newFixedThreadPool(5);
Future<Boolean>[] result = new Future[pageNum];
for (int offset = 0; offset < pageNum; offset  ) {
Future<Boolean> future = es.submit(new DataFuture(min, batchNum, offset));
result[i] = future;
}
for (Future<Boolean> future : result) {
try {
future.get();
} catch (Exception e) {
logger.error("callBack error", e);
}
}
es.shutdown();
}

DataFuture執行分批資料獲取


class DataFuture implements Callable<Boolean> {
ResultCallBack<Set<String>> callBack;
int offset;
long min;
int batchNum;
public DataFuture(long min, int batchNum, int offset) {
super();
this.batchNum = batchNum;
this.offset = offset;
this.min = min;
}
@Override
public Boolean call() throws Exception {
long start = offset * batchNum   min;
long end = start   batchNum - 1;
// 資料庫執行 select * from table where id BETWEEN start and end
Set<String> querySet = getData(start, end);
return true;
}
}

ResultSet方式

採用流方式獲取資料

  • 當statement設定以下屬性時,採用的是流資料接收方式,每次只從伺服器接收部份資料,
    直到所有資料處理完畢,不會發生JVM OOM。
  setResultSetType(ResultSet.TYPE_FORWARD_ONLY);
setFetchSize(Integer.MIN_VALUE); 
  • 呼叫statement的enableStreamingResults方法,實際上enableStreamingResults方法內部封裝的就是第1種方式。

  • 設定連線屬性useCursorFetch=true (5.0版驅動開始支援),statement以TYPE_FORWARD_ONLY開啟,
    再設定fetch size引數,表示採用伺服器端遊標,每次從伺服器取fetch_size條資料。

    public void getSubData(int i, final CallBack<SubData> cb) {
StringBuffer sql = new StringBuffer();
sql.append("select * from table");
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
con = subJdbcTemplate.getDataSource().getConnection();
ps = con.prepareStatement(String.valueOf(sql), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
ps.setFetchDirection(ResultSet.FETCH_REVERSE);
rs = ps.executeQuery();
long batchSize = 1000;
int currLineNum = 0;
List<SubData> result = Lists.newArrayList();
while (rs.next()) {
currLineNum = currLineNum   1;
int cp = rs.getInt(1);
String appId = rs.getString(2);
String deviceId = rs.getString(3);
String token = rs.getString(4);
String alias = rs.getString(5);
SubData subData = new SubData(appId, cp, appId, deviceId, token, alias);
if (currLineNum % batchSize != 0) {
result.add(subData);
} else {
result.add(subData);
List<SubData> copyList = Lists.newArrayList();
copyList.addAll(result);
result.clear();
cb.callBack(copyList);
}
}
if (CollectionUtils.isNotEmpty(result)) {
List<SubData> copyList = Lists.newArrayList();
copyList.addAll(result);
result.clear();
cb.callBack(copyList);
}
} catch (SQLException e) {
logger.error("getSubData error", e);
} finally {
try {
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
logger.error("getSubData error", e);
}
try {
if (ps != null) {
ps.close();
}
} catch (SQLException e) {
logger.error("getSubData error", e);
}
try {
if (con != null) {
con.close();
}
} catch (SQLException e) {
logger.error("getSubData error", e);
}
}
}

PreparedStatement中

    /**
* We only stream result sets when they are forward-only, read-only, and the
* 僅當結果集僅為只讀、只讀時,才流出結果集。
* fetch size has been set to Integer.MIN_VALUE
* 獲取大小已設定為整數。
* @return true if this result set should be streamed row at-a-time, rather
*         than read all at once.
*  如果此結果集應按時間順序流行,則返回true,而不是一次閱讀。       
*         
*/
protected boolean createStreamingResultSet() {
try {
synchronized (checkClosed()) {
return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY)
&& (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE));
}
} catch (SQLException e) {
// we can't break the interface, having this be no-op in case of error is ok
return false;
}
}
  • 優點

    • 流式獲取資料,只需執行一次請求,效率高,不會發生OOM
  • 缺點

    • 流式執行失敗,需要從頭再開始

總結

個人推薦使用ResultSet方式流方式獲取資料

程式語言 最新文章