Skip to content

Commit a3d8d32

Browse files
author
dujie
committed
[feature-#1926] add paimon dependency (#1943)
1 parent 24b9713 commit a3d8d32

File tree

10 files changed

+103
-6
lines changed

10 files changed

+103
-6
lines changed

bin/start-chunjun

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ else
3333
fi
3434
fi
3535

36-
JAR_DIR=$CHUNJUN_HOME/lib/*
36+
JAR_DIR=$CHUNJUN_HOME/client/*
3737
CLASS_NAME=com.dtstack.chunjun.client.Launcher
3838

3939
echo "ChunJun starting ..."

bin/submit.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ fi
4646
# 1.In yarn-session case, JAR_DIR can not be found
4747
# 2.In other cases, JAR_DIR can be found
4848
if [ $CHUNJUN_DEPLOY_MODE -eq 1 ]; then
49-
JAR_DIR=$CHUNJUN_HOME/lib/chunjun-clients.jar:$CHUNJUN_HOME/lib/*
49+
JAR_DIR=$CHUNJUN_HOME/client/chunjun-clients.jar:$CHUNJUN_HOME/lib/*
5050
else
51-
JAR_DIR=$CHUNJUN_HOME/../lib/chunjun-clients.jar:$CHUNJUN_HOME/../lib/*
51+
JAR_DIR=$CHUNJUN_HOME/../client/chunjun-clients.jar:$CHUNJUN_HOME/../client/*
5252
fi
5353

5454
CLASS_NAME=com.dtstack.chunjun.client.Launcher

chunjun-catalog/chunjun-catalog-iceberg/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
<maven.compiler.source>8</maven.compiler.source>
1717
<maven.compiler.target>8</maven.compiler.target>
1818
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19+
<iceberg.version>1.5.2</iceberg.version>
1920
</properties>
2021

2122
<description>iceberg</description>
2223

2324
<dependencies>
24-
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-core -->
2525
<dependency>
2626
<groupId>org.apache.iceberg</groupId>
27-
<artifactId>iceberg-core</artifactId>
28-
<version>1.10.0</version>
27+
<artifactId>iceberg-flink-runtime-1.16</artifactId>
28+
<version>${iceberg.version}</version>
2929
</dependency>
3030
</dependencies>
3131

chunjun-catalog/chunjun-catalog-paimon/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@
2222
<description>paimon</description>
2323

2424
<dependencies>
25+
<dependency>
26+
<groupId>log4j</groupId>
27+
<artifactId>log4j</artifactId>
28+
<version>${log4j.version}</version>
29+
<!--<scope>provided</scope>-->
30+
</dependency>
31+
2532
<!-- https://mvnrepository.com/artifact/org.apache.paimon/paimon-flink-1.16 -->
2633
<dependency>
2734
<groupId>org.apache.paimon</groupId>

chunjun-clients/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@
4444
</exclusions>
4545
</dependency>
4646

47+
<dependency>
48+
<groupId>org.apache.flink</groupId>
49+
<artifactId>flink-connector-base</artifactId>
50+
<version>${flink.version}</version>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.apache.flink</groupId>
55+
<artifactId>flink-connector-files</artifactId>
56+
<version>${flink.version}</version>
57+
</dependency>
58+
4759
<dependency>
4860
<groupId>com.dtstack.chunjun</groupId>
4961
<artifactId>chunjun-core</artifactId>

chunjun-core/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,18 @@
191191
<artifactId>flink-table-planner-loader</artifactId>
192192
<version>${flink.version}</version>
193193
</dependency>
194+
195+
<dependency>
196+
<groupId>org.apache.flink</groupId>
197+
<artifactId>flink-connector-base</artifactId>
198+
<version>${flink.version}</version>
199+
</dependency>
200+
201+
<dependency>
202+
<groupId>org.apache.flink</groupId>
203+
<artifactId>flink-connector-files</artifactId>
204+
<version>${flink.version}</version>
205+
</dependency>
194206
<!-- flink sql -->
195207

196208
<!--flink与Hadoop mapReduce兼容包-->

chunjun-core/src/main/java/com/dtstack/chunjun/constants/ConstantValue.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public class ConstantValue {
7272

7373
public static final String CONNECTOR_DIR_NAME = "connector";
7474

75+
public static final String CATALOG_DIR_NAME = "catalog";
76+
7577
public static final String FORMAT_DIR_NAME = "formats";
7678

7779
public static final String DIRTY_DATA_DIR_NAME = "dirty-data-collector";

chunjun-core/src/main/java/org/apache/flink/table/factories/FactoryUtil.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,12 @@ public static <T extends Factory> T discoverFactory(
546546
classLoader,
547547
ConstantValue.CONNECTOR_DIR_NAME);
548548
}
549+
} else if (factoryClass.isAssignableFrom(CatalogFactory.class)) {
550+
com.dtstack.chunjun.util.FactoryHelper factoryHelper = factoryHelperThreadLocal.get();
551+
if (factoryHelper != null) {
552+
factoryHelper.registerCachedFile(
553+
unconvertedFactoryIdentifier, classLoader, ConstantValue.CATALOG_DIR_NAME);
554+
}
549555
} else {
550556
com.dtstack.chunjun.util.FactoryHelper factoryHelper = factoryHelperThreadLocal.get();
551557
if (factoryHelper != null) {
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
CREATE
2+
CATALOG my_catalog WITH (
3+
'type'='paimon',
4+
'metastore' = 'filesystem',
5+
'warehouse'='file:/tmp/paimon'
6+
);
7+
8+
USE CATALOG my_catalog;
9+
10+
-- create a word count table
11+
CREATE TABLE if not exists word_count
12+
(
13+
word STRING PRIMARY KEY NOT ENFORCED,
14+
cnt BIGINT
15+
);
16+
17+
CREATE
18+
TEMPORARY TABLE sink (
19+
word STRING,
20+
cnt BIGINT
21+
) WITH (
22+
'connector' = 'stream-x'
23+
);
24+
25+
insert into sink SELECT word, cnt FROM word_count ;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
CREATE
2+
CATALOG my_catalog WITH (
3+
'type'='paimon',
4+
'metastore' = 'filesystem',
5+
'warehouse'='file:/tmp/paimon'
6+
);
7+
8+
USE CATALOG my_catalog;
9+
10+
-- create a word count table
11+
CREATE TABLE if not exists word_count
12+
(
13+
word STRING PRIMARY KEY NOT ENFORCED,
14+
cnt BIGINT
15+
);
16+
17+
18+
CREATE
19+
TEMPORARY TABLE word_table (
20+
word STRING
21+
) WITH (
22+
'connector' = 'datagen',
23+
'fields.word.length' = '1'
24+
);
25+
26+
27+
28+
SET execution.checkpointing.interval=10000;
29+
30+
INSERT INTO word_count
31+
SELECT word, COUNT(*)
32+
FROM word_table
33+
GROUP BY word;

0 commit comments

Comments
 (0)