|
|
|
@ -63,14 +63,13 @@ public class MongoDBMetricCase extends MongoDBSourceTestBase {
|
|
|
|
|
.hosts(mongoContainer.getHostAndPort())
|
|
|
|
|
.username(FLINK_USER)
|
|
|
|
|
.password(FLINK_USER_PASSWORD)
|
|
|
|
|
.databaseList(customerDatabase) // 设置捕获的数据库,支持正则表达式
|
|
|
|
|
.databaseList(customerDatabase)
|
|
|
|
|
.collectionList(
|
|
|
|
|
getCollectionNameRegex(
|
|
|
|
|
customerDatabase,
|
|
|
|
|
new String[] {"customers"})) // 设置捕获的集合,支持正则表达式
|
|
|
|
|
customerDatabase, new String[] {"customers"}))
|
|
|
|
|
.deserializer(new JsonDebeziumDeserializationSchema())
|
|
|
|
|
.build();
|
|
|
|
|
DataStreamSource<String> stream = env.addSource(sourceFunction, "MongoDB");
|
|
|
|
|
DataStreamSource<String> stream = env.addSource(sourceFunction, "MongoDB CDC Source");
|
|
|
|
|
CollectResultIterator<String> iterator = addCollector(env, stream);
|
|
|
|
|
JobClient jobClient = env.executeAsync();
|
|
|
|
|
iterator.setJobClient(jobClient);
|
|
|
|
@ -87,7 +86,7 @@ public class MongoDBMetricCase extends MongoDBSourceTestBase {
|
|
|
|
|
|
|
|
|
|
// Check metrics
|
|
|
|
|
List<OperatorMetricGroup> metricGroups =
|
|
|
|
|
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MongoDB");
|
|
|
|
|
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MongoDB CDC Source");
|
|
|
|
|
|
|
|
|
|
// There should be only 1 parallelism of source, so it's safe to get the only group
|
|
|
|
|
OperatorMetricGroup group = metricGroups.get(0);
|
|
|
|
|