Skip to content

Commit 6ad4430

Browse files
author
高思伟
committed
[ISSUE #63]
Support rich initialization modes of RocketMQSource Fix some bugs
1 parent 90b00be commit 6ad4430

24 files changed

+1569
-368
lines changed

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The RocketMQSourceFunction is based on RocketMQ pull consumer mode, and provides
1414
Otherwise, the source doesn't provide any reliability guarantees.
1515

1616
### KeyValueDeserializationSchema
17+
1718
The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema` interface.
1819
`rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` implementations called `SimpleKeyValueDeserializationSchema`.
1920

@@ -23,7 +24,16 @@ public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>
2324
}
2425
```
2526

27+
## RocketMQSource
28+
29+
RocketMQSource implement flink's new source interface,which provide capability of flow-batch integration.Now you can construct an instance by `RocketMQSourceBuilder.build()`.
30+
31+
### RocketMQDeserializationSchema
32+
33+
The mian API for deserializing topic and tags is the `org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema`interface.`rocketmq-flink` includes general purpose `RocketMQDeserializationSchema` implementations called `RocketMQRowDeserializationSchema` and `SimpleStringSchema`.If you only focus on the value of message,you can use the wrapper class of `RocketMQValueOnlyDeserializationSchemaWrapper` to expand.
34+
2635
## RocketMQSink
36+
2737
To use the `RocketMQSink`, you construct an instance of it by specifying KeyValueSerializationSchema & TopicSelector instances and a Properties instance which including rocketmq configs.
2838
`RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props)`
2939
The RocketMQSink provides at-least-once reliability guarantees when checkpoints are enabled and `withBatchFlushOnCheckpoint(true)` is set.
@@ -57,6 +67,9 @@ public interface TopicSelector<T> extends Serializable {
5767
```
5868

5969
## Examples
70+
71+
You can find more examples in directory of `org.apache.rocketmq.flink.legacy.example`
72+
6073
The following is an example which receive messages from RocketMQ brokers and send messages to broker after processing.
6174

6275
```java
@@ -119,7 +132,36 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
119132
}
120133
```
121134

135+
The following is an example which use new source function to fetch the records and deserialize to a simple string.
136+
137+
```java
138+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
139+
env.enableCheckpointing(30000L);
140+
141+
RocketMQSource<String> source =
142+
RocketMQSource.<String>builder()
143+
.setNameServerAddress(nameServerAddress)
144+
.setTopic(topic)
145+
.setConsumerGroup(consumerGroup)
146+
.setStartFromEarliest()
147+
.setDeserializer(
148+
new RocketMQValueOnlyDeserializationSchemaWrapper<>(
149+
new SimpleStringSchema()))
150+
.build();
151+
152+
DataStreamSource<String> newSource =
153+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "new source")
154+
.setParallelism(4);
155+
156+
newSource.print().setParallelism(1);
157+
158+
env.execute();
159+
```
160+
161+
162+
122163
## Configurations
164+
123165
The following configurations are all from the class `org.apache.rocketmq.flink.legacy.RocketMQConfig`.
124166

125167
### Producer Configurations

pom.xml

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,24 @@
4242
</properties>
4343

4444
<dependencies>
45+
<dependency>
46+
<groupId>org.slf4j</groupId>
47+
<artifactId>slf4j-log4j12</artifactId>
48+
<version>1.7.10</version>
49+
<scope>provided</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-runtime-web</artifactId>
54+
<version>${flink.version}</version>
55+
<scope>provided</scope>
56+
</dependency>
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-table-planner_2.12</artifactId>
60+
<version>${flink.version}</version>
61+
<scope>provided</scope>
62+
</dependency>
4563
<dependency>
4664
<groupId>org.apache.flink</groupId>
4765
<artifactId>flink-java</artifactId>
@@ -114,11 +132,27 @@
114132
<groupId>org.apache.rocketmq</groupId>
115133
<artifactId>rocketmq-namesrv</artifactId>
116134
<version>${rocketmq.version}</version>
135+
<exclusions>
136+
<exclusion>
137+
<artifactId>logback-core</artifactId>
138+
<groupId>ch.qos.logback</groupId>
139+
</exclusion>
140+
<exclusion>
141+
<artifactId>logback-classic</artifactId>
142+
<groupId>ch.qos.logback</groupId>
143+
</exclusion>
144+
</exclusions>
117145
</dependency>
118146
<dependency>
119147
<groupId>org.apache.rocketmq</groupId>
120148
<artifactId>rocketmq-broker</artifactId>
121149
<version>${rocketmq.version}</version>
150+
<exclusions>
151+
<exclusion>
152+
<artifactId>logback-classic</artifactId>
153+
<groupId>ch.qos.logback</groupId>
154+
</exclusion>
155+
</exclusions>
122156
</dependency>
123157
<dependency>
124158
<groupId>org.apache.rocketmq</groupId>
@@ -142,7 +176,7 @@
142176
<groupId>junit</groupId>
143177
<artifactId>junit</artifactId>
144178
<scope>test</scope>
145-
<version>4.12</version>
179+
<version>4.13.2</version>
146180
</dependency>
147181
<dependency>
148182
<groupId>org.powermock</groupId>
@@ -179,6 +213,7 @@
179213
<exclude>META-INF/*.SF</exclude>
180214
<exclude>META-INF/*.DSA</exclude>
181215
<exclude>META-INF/*.RSA</exclude>
216+
<exclude>log4j.properties</exclude>
182217
</excludes>
183218
</filter>
184219
</filters>
@@ -194,18 +229,6 @@
194229
</execution>
195230
</executions>
196231
</plugin>
197-
198-
<plugin>
199-
<artifactId>maven-compiler-plugin</artifactId>
200-
<version>3.5.1</version>
201-
<configuration>
202-
<source>${maven.compiler.source}</source>
203-
<target>${maven.compiler.target}</target>
204-
<compilerVersion>${maven.compiler.source}</compilerVersion>
205-
<showDeprecation>true</showDeprecation>
206-
<showWarnings>true</showWarnings>
207-
</configuration>
208-
</plugin>
209232
<plugin>
210233
<groupId>org.apache.maven.plugins</groupId>
211234
<artifactId>maven-surefire-plugin</artifactId>

src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818

1919
package org.apache.rocketmq.flink.common;
2020

21+
import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
22+
import org.apache.rocketmq.flink.legacy.common.config.StartupMode;
23+
2124
import org.apache.flink.configuration.ConfigOption;
2225
import org.apache.flink.configuration.ConfigOptions;
2326

24-
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;
25-
2627
/** Includes config options of RocketMQ connector type. */
2728
public class RocketMQOptions {
2829

@@ -44,25 +45,11 @@ public class RocketMQOptions {
4445
public static final ConfigOption<String> OPTIONAL_SQL =
4546
ConfigOptions.key("sql").stringType().noDefaultValue();
4647

47-
public static final ConfigOption<Long> OPTIONAL_START_MESSAGE_OFFSET =
48-
ConfigOptions.key("startMessageOffset")
49-
.longType()
50-
.defaultValue(DEFAULT_START_MESSAGE_OFFSET);
51-
52-
public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS =
53-
ConfigOptions.key("startTimeMs").longType().defaultValue(-1L);
54-
55-
public static final ConfigOption<String> OPTIONAL_START_TIME =
56-
ConfigOptions.key("startTime").stringType().noDefaultValue();
57-
58-
public static final ConfigOption<String> OPTIONAL_END_TIME =
59-
ConfigOptions.key("endTime").stringType().noDefaultValue();
60-
61-
public static final ConfigOption<String> OPTIONAL_TIME_ZONE =
62-
ConfigOptions.key("timeZone").stringType().noDefaultValue();
48+
public static final ConfigOption<Long> OPTIONAL_END_TIME_STAMP =
49+
ConfigOptions.key("endTimestamp").longType().defaultValue(Long.MAX_VALUE);
6350

6451
public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS =
65-
ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L);
52+
ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(-1L);
6653

6754
public static final ConfigOption<Boolean> OPTIONAL_USE_NEW_API =
6855
ConfigOptions.key("useNewApi").booleanType().defaultValue(true);
@@ -109,9 +96,34 @@ public class RocketMQOptions {
10996
public static final ConfigOption<String> OPTIONAL_SECRET_KEY =
11097
ConfigOptions.key("secretKey").stringType().noDefaultValue();
11198

112-
public static final ConfigOption<String> OPTIONAL_SCAN_STARTUP_MODE =
113-
ConfigOptions.key("scanStartupMode").stringType().defaultValue("latest");
114-
115-
public static final ConfigOption<Long> OPTIONAL_OFFSET_FROM_TIMESTAMP =
116-
ConfigOptions.key("offsetFromTimestamp").longType().noDefaultValue();
99+
// --------------------------------------------------------------------------------------------
100+
// Scan specific options
101+
// --------------------------------------------------------------------------------------------
102+
103+
public static final ConfigOption<StartupMode> OPTIONAL_SCAN_STARTUP_MODE =
104+
ConfigOptions.key("scan.startup.mode")
105+
.enumType(StartupMode.class)
106+
.defaultValue(StartupMode.GROUP_OFFSETS)
107+
.withDescription("Startup mode for RocketMQ consumer.");
108+
109+
public static final ConfigOption<OffsetResetStrategy> OPTIONAL_SCAN_OFFSET_RESET_STRATEGY =
110+
ConfigOptions.key("scan.offsetReset.strategy")
111+
.enumType(OffsetResetStrategy.class)
112+
.defaultValue(OffsetResetStrategy.LATEST)
113+
.withDescription(
114+
"The offsetReset strategy only be used if group offsets is not found");
115+
116+
public static final ConfigOption<String> OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS =
117+
ConfigOptions.key("scan.startup.specific-offsets")
118+
.stringType()
119+
.noDefaultValue()
120+
.withDescription(
121+
"Optional offsets used in case of \"specific-offsets\" startup mode");
122+
123+
public static final ConfigOption<Long> OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS =
124+
ConfigOptions.key("scan.startup.timestamp-millis")
125+
.longType()
126+
.defaultValue(-1L)
127+
.withDescription(
128+
"Optional timestamp used in case of \"timestamp\" startup mode");
117129
}

src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java

Lines changed: 10 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,6 @@ public void open(Configuration parameters) throws Exception {
166166
this.enableCheckpoint =
167167
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
168168

169-
if (offsetTable == null) {
170-
offsetTable = new ConcurrentHashMap<>();
171-
}
172169
if (restoredOffsets == null) {
173170
restoredOffsets = new ConcurrentHashMap<>();
174171
}
@@ -247,7 +244,16 @@ public void open(Configuration parameters) throws Exception {
247244
// If the job recovers from the state, the state has already contained the offsets of last
248245
// commit.
249246
if (!restored) {
250-
initOffsets(messageQueues);
247+
this.offsetTable =
248+
RocketMQUtils.initOffsets(
249+
messageQueues,
250+
consumer,
251+
startMode,
252+
offsetResetStrategy,
253+
specificTimeStamp,
254+
specificStartupOffsets);
255+
} else {
256+
this.offsetTable = new ConcurrentHashMap<>();
251257
}
252258
}
253259

@@ -394,76 +400,6 @@ private void awaitTermination() throws InterruptedException {
394400
}
395401
}
396402

397-
/**
398-
* only flink job start with no state can init offsets from broker
399-
*
400-
* @param messageQueues
401-
* @throws MQClientException
402-
*/
403-
private void initOffsets(List<MessageQueue> messageQueues) throws MQClientException {
404-
for (MessageQueue mq : messageQueues) {
405-
long offset;
406-
switch (startMode) {
407-
case LATEST:
408-
offset = consumer.maxOffset(mq);
409-
break;
410-
case EARLIEST:
411-
offset = consumer.minOffset(mq);
412-
break;
413-
case GROUP_OFFSETS:
414-
offset = consumer.fetchConsumeOffset(mq, false);
415-
// the min offset return if consumer group first join,return a negative number
416-
// if
417-
// catch exception when fetch from broker.
418-
// If you want consumer from earliest,please use OffsetResetStrategy.EARLIEST
419-
if (offset <= 0) {
420-
switch (offsetResetStrategy) {
421-
case LATEST:
422-
offset = consumer.maxOffset(mq);
423-
log.info(
424-
"current consumer thread:{} has no committed offset,use Strategy:{} instead",
425-
mq,
426-
offsetResetStrategy);
427-
break;
428-
case EARLIEST:
429-
log.info(
430-
"current consumer thread:{} has no committed offset,use Strategy:{} instead",
431-
mq,
432-
offsetResetStrategy);
433-
offset = consumer.minOffset(mq);
434-
break;
435-
default:
436-
break;
437-
}
438-
}
439-
break;
440-
case TIMESTAMP:
441-
offset = consumer.searchOffset(mq, specificTimeStamp);
442-
break;
443-
case SPECIFIC_OFFSETS:
444-
if (specificStartupOffsets == null) {
445-
throw new RuntimeException(
446-
"StartMode is specific_offsets.But none offsets has been specified");
447-
}
448-
Long specificOffset = specificStartupOffsets.get(mq);
449-
if (specificOffset != null) {
450-
offset = specificOffset;
451-
} else {
452-
offset = consumer.fetchConsumeOffset(mq, false);
453-
}
454-
break;
455-
default:
456-
throw new IllegalArgumentException(
457-
"current startMode is not supported" + startMode);
458-
}
459-
log.info(
460-
"current consumer queue:{} start from offset of: {}",
461-
mq.getBrokerName() + "-" + mq.getQueueId(),
462-
offset);
463-
offsetTable.put(mq, offset);
464-
}
465-
}
466-
467403
/** consume from the min offset at every restart with no state */
468404
public RocketMQSourceFunction<OUT> setStartFromEarliest() {
469405
this.startMode = StartupMode.EARLIEST;

src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,38 @@
1717

1818
package org.apache.rocketmq.flink.legacy.common.config;
1919

20+
import org.apache.flink.configuration.DescribedEnum;
21+
import org.apache.flink.configuration.description.InlineElement;
22+
23+
import static org.apache.flink.configuration.description.TextElement.text;
24+
2025
/** RocketMQ startup mode. */
21-
public enum StartupMode {
22-
EARLIEST,
23-
LATEST,
24-
GROUP_OFFSETS,
25-
TIMESTAMP,
26-
SPECIFIC_OFFSETS
26+
public enum StartupMode implements DescribedEnum {
27+
EARLIEST("earliest-offset", text("Start from the earliest offset possible.")),
28+
LATEST("latest-offset", text("Start from the latest offset.")),
29+
GROUP_OFFSETS(
30+
"group-offsets",
31+
text("Start from committed offsets in brokers of a specific consumer group.")),
32+
TIMESTAMP("timestamp", text("Start from user-supplied timestamp for each message queue.")),
33+
SPECIFIC_OFFSETS(
34+
"specific-offsets",
35+
text("Start from user-supplied specific offsets for each message queue."));
36+
37+
private final String value;
38+
private final InlineElement description;
39+
40+
StartupMode(String value, InlineElement description) {
41+
this.value = value;
42+
this.description = description;
43+
}
44+
45+
@Override
46+
public InlineElement getDescription() {
47+
return description;
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return value;
53+
}
2754
}

0 commit comments

Comments
 (0)