Skip to content

Commit 8422832

Browse files
authored
[Feature][jdbc splitpk] support manually set the Min/Max value (#1541)
1 parent 47a72c7 commit 8422832

File tree

3 files changed

+74
-16
lines changed

3 files changed

+74
-16
lines changed

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable {
5656
protected String orderByColumn;
5757
protected String querySql;
5858
protected String splitPk;
59+
protected String splitPkStart;
60+
protected String splitPkEnd;
5961
protected String splitStrategy;
6062
protected int fetchSize = 0;
6163
protected int queryTimeOut = 0;
@@ -142,6 +144,22 @@ public String getTable() {
142144
return connection.get(0).getTable().get(0);
143145
}
144146

147+
public String getSplitPkStart() {
148+
return splitPkStart;
149+
}
150+
151+
public void setSplitPkStart(String splitPkStart) {
152+
this.splitPkStart = splitPkStart;
153+
}
154+
155+
public String getSplitPkEnd() {
156+
return splitPkEnd;
157+
}
158+
159+
public void setSplitPkEnd(String splitPkEnd) {
160+
this.splitPkEnd = splitPkEnd;
161+
}
162+
145163
public void setTable(String table) {
146164
connection.get(0).getTable().set(0, table);
147165
}
@@ -485,6 +503,12 @@ public String toString() {
485503
+ ", splitPk='"
486504
+ splitPk
487505
+ '\''
506+
+ ", splitPkStart='"
507+
+ splitPkStart
508+
+ '\''
509+
+ ", splitPkEnd='"
510+
+ splitPkEnd
511+
+ '\''
488512
+ ", splitStrategy='"
489513
+ splitStrategy
490514
+ '\''

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -394,25 +394,32 @@ private Pair<String, String> getSplitRangeFromDb() {
394394
try {
395395
long startTime = System.currentTimeMillis();
396396

397-
String querySplitRangeSql = SqlUtil.buildQuerySplitRangeSql(jdbcConf, jdbcDialect);
398-
LOG.info(String.format("Query SplitRange sql is '%s'", querySplitRangeSql));
399-
400-
conn = getConnection();
401-
st = conn.createStatement(resultSetType, resultSetConcurrency);
402-
st.setQueryTimeout(jdbcConf.getQueryTimeOut());
403-
rs = st.executeQuery(querySplitRangeSql);
404-
if (rs.next()) {
397+
if (jdbcConf.getSplitPkStart() != null && jdbcConf.getSplitPkEnd() != null) {
405398
splitPkRange =
406399
Pair.of(
407-
String.valueOf(rs.getObject("min_value")),
408-
String.valueOf(rs.getObject("max_value")));
409-
}
400+
String.valueOf(jdbcConf.getSplitPkStart()),
401+
String.valueOf(jdbcConf.getSplitPkEnd()));
410402

411-
LOG.info(
412-
String.format(
413-
"Takes [%s] milliseconds to get the SplitRange value [%s]",
414-
System.currentTimeMillis() - startTime, splitPkRange));
403+
} else {
404+
String querySplitRangeSql = SqlUtil.buildQuerySplitRangeSql(jdbcConf, jdbcDialect);
405+
LOG.info(String.format("Query SplitRange sql is '%s'", querySplitRangeSql));
406+
407+
conn = getConnection();
408+
st = conn.createStatement(resultSetType, resultSetConcurrency);
409+
st.setQueryTimeout(jdbcConf.getQueryTimeOut());
410+
rs = st.executeQuery(querySplitRangeSql);
411+
if (rs.next()) {
412+
splitPkRange =
413+
Pair.of(
414+
String.valueOf(rs.getObject("min_value")),
415+
String.valueOf(rs.getObject("max_value")));
416+
}
415417

418+
LOG.info(
419+
String.format(
420+
"Takes [%s] milliseconds to get the SplitRange value [%s]",
421+
System.currentTimeMillis() - startTime, splitPkRange));
422+
}
416423
return splitPkRange;
417424
} catch (Throwable e) {
418425
throw new ChunJunRuntimeException(
@@ -806,7 +813,8 @@ protected Connection getConnection() throws SQLException {
806813
/** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
807814
@Override
808815
protected boolean useCustomReporter() {
809-
return jdbcConf.isIncrement() && jdbcConf.getInitReporter();
816+
// 配置了 reporter 就可以输入指标到外部系统, 如果不是增量, 增量指标也不会被输出
817+
return jdbcConf.getInitReporter();
810818
}
811819

812820
/** 为了保证增量数据的准确性,指标输出失败时使任务失败 */

docs_zh/ChunJun连接器/oracle/oracle-source.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,32 @@ Oracle 9 及以上
110110
- 默认值:无
111111
<br />
112112
113+
- **splitPkStart**
114+
115+
- 描述: 当指定了 `splitPk` 之后,可以手动指定 `splitPk` 的上/下界,之后可以直接根据这两个值计算每个并行度的数据量。
116+
- 注意:
117+
- 此参数生效的前提是设置了 `splitPk`,并且需要同时设置 **splitPkEnd** 参数。
118+
- 推荐splitPk使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
119+
- 目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,ChunJun将报错。
120+
- 如果channel大于1但是没有配置此参数,任务将置为失败。
121+
- 必选:否
122+
- 参数类型:String
123+
- 默认值:无
124+
<br />
125+
126+
- **splitPkEnd**
127+
128+
- 描述: 当指定了 `splitPk` 之后,可以手动指定 `splitPk` 的上/下界,之后可以直接根据这两个值计算每个并行度的数据量。
129+
- 注意:
130+
- 此参数生效的前提是设置了 `splitPk`,并且需要同时设置 **splitPkStart** 参数。
131+
- 推荐splitPk使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
132+
- 目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,ChunJun将报错。
133+
- 如果channel大于1但是没有配置此参数,任务将置为失败。
134+
- 必选:否
135+
- 参数类型:String
136+
- 默认值:无
137+
<br />
138+
113139
- **queryTimeOut**
114140
115141
- 描述:查询超时时间,单位秒。

0 commit comments

Comments
 (0)