|
|
|
@ -37,7 +37,7 @@ import org.apache.kafka.connect.source.SourceRecord;
|
|
|
|
|
*/
|
|
|
|
|
public class SignalEventDispatcher {
|
|
|
|
|
|
|
|
|
|
private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
|
|
|
|
|
private static final SchemaNameAdjuster SCHEMA_NAME_ADJUSTER = SchemaNameAdjuster.create();
|
|
|
|
|
|
|
|
|
|
public static final String DATABASE_NAME = "db";
|
|
|
|
|
public static final String TABLE_NAME = "table";
|
|
|
|
@ -66,13 +66,13 @@ public class SignalEventDispatcher {
|
|
|
|
|
this.queue = queue;
|
|
|
|
|
this.signalEventKeySchema =
|
|
|
|
|
SchemaBuilder.struct()
|
|
|
|
|
.name(schemaNameAdjuster.adjust(SIGNAL_EVENT_KEY_SCHEMA_NAME))
|
|
|
|
|
.name(SCHEMA_NAME_ADJUSTER.adjust(SIGNAL_EVENT_KEY_SCHEMA_NAME))
|
|
|
|
|
.field(SPLIT_ID_KEY, Schema.STRING_SCHEMA)
|
|
|
|
|
.field(WATERMARK_SIGNAL, Schema.BOOLEAN_SCHEMA)
|
|
|
|
|
.build();
|
|
|
|
|
this.signalEventValueSchema =
|
|
|
|
|
SchemaBuilder.struct()
|
|
|
|
|
.name(schemaNameAdjuster.adjust(SIGNAL_EVENT_VALUE_SCHEMA_NAME))
|
|
|
|
|
.name(SCHEMA_NAME_ADJUSTER.adjust(SIGNAL_EVENT_VALUE_SCHEMA_NAME))
|
|
|
|
|
.field(SPLIT_ID_KEY, Schema.STRING_SCHEMA)
|
|
|
|
|
.field(WATERMARK_KIND, Schema.STRING_SCHEMA)
|
|
|
|
|
.field(BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA)
|
|
|
|
@ -87,7 +87,7 @@ public class SignalEventDispatcher {
|
|
|
|
|
SourceRecord sourceRecord =
|
|
|
|
|
new SourceRecord(
|
|
|
|
|
offsetContext.getPartition(),
|
|
|
|
|
offsetContext.getPartition(),
|
|
|
|
|
offsetContext.getOffset(),
|
|
|
|
|
topic,
|
|
|
|
|
signalEventKeySchema,
|
|
|
|
|
signalRecordKey(mySqlSplit.splitId()),
|
|
|
|
@ -120,9 +120,9 @@ public class SignalEventDispatcher {
|
|
|
|
|
BINLOG_END;
|
|
|
|
|
|
|
|
|
|
public WatermarkKind fromString(String kindString) {
|
|
|
|
|
if ("LOW".equalsIgnoreCase(kindString)) {
|
|
|
|
|
if (LOW.name().equalsIgnoreCase(kindString)) {
|
|
|
|
|
return LOW;
|
|
|
|
|
} else if ("HIGH".equalsIgnoreCase(kindString)) {
|
|
|
|
|
} else if (HIGH.name().equalsIgnoreCase(kindString)) {
|
|
|
|
|
return HIGH;
|
|
|
|
|
} else {
|
|
|
|
|
return BINLOG_END;
|
|
|
|
|