diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/DataSource.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/DataSource.java new file mode 100644 index 000000000..3ba94ca43 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/DataSource.java @@ -0,0 +1,33 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.source; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * {@code DataSource} is used to access metadata and read change data from external systems. It can + * read data from multiple tables simultaneously. + */ +@PublicEvolving +public interface DataSource { + + /** Get the {@link EventSourceProvider} for reading events from external systems. */ + EventSourceProvider getEventSourceProvider(); + + /** Get the {@link MetadataAccessor} for accessing metadata from external systems. */ + MetadataAccessor getMetadataAccessor(); +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/EventSourceProvider.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/EventSourceProvider.java new file mode 100644 index 000000000..72b41ba81 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/EventSourceProvider.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +/** + * A marker interface used to provide an event source for reading events from external systems. We + * can reuse exiting Flink {@link Source} and Flink {@link SourceFunction} implementation, and we + * can support our own {@code EventSource} implementation in the future. + */ +@PublicEvolving +public interface EventSourceProvider {} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/FlinkSourceFunctionProvider.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/FlinkSourceFunctionProvider.java new file mode 100644 index 000000000..7c31618f3 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/FlinkSourceFunctionProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import com.ververica.cdc.common.event.Event; + +/** + * {@code FlinkSourceFunctionProvider} is used to provide a Flink {@link SourceFunction} for reading + * events from external systems. + */ +@PublicEvolving +public interface FlinkSourceFunctionProvider extends EventSourceProvider { + + /** Get the {@link SourceFunction} for reading events from external systems. */ + SourceFunction getSourceFunction(); + + /** Create a {@link FlinkSourceFunctionProvider} from a {@link SourceFunction}. */ + static FlinkSourceFunctionProvider of(SourceFunction sourceFunction) { + return () -> sourceFunction; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/FlinkSourceProvider.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/FlinkSourceProvider.java new file mode 100644 index 000000000..cdcf1e0f7 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/FlinkSourceProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Source; + +import com.ververica.cdc.common.event.Event; + +/** + * {@code FlinkSourceProvider} is used to provide a Flink {@link Source} for reading events from + * external systems. + */ +@PublicEvolving +public interface FlinkSourceProvider extends EventSourceProvider { + + /** Get the {@link Source} for reading events from external systems. */ + Source getSource(); + + /** Create a {@link FlinkSourceProvider} from a {@link Source}. */ + static FlinkSourceProvider of(Source source) { + return () -> source; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/MetadataAccessor.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/MetadataAccessor.java new file mode 100644 index 000000000..1b663e522 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/MetadataAccessor.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.source; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.event.TableId; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * {@code MetadataAccessor} is used by {@link DataSource} to access the metadata from external + * systems. + */ +@PublicEvolving +public interface MetadataAccessor { + + /** List all namespaces from external systems. */ + List listNamespaces() throws UnsupportedOperationException; + + /** List schemas by namespace from external systems. */ + List listSchemas(@Nullable String namespace) throws UnsupportedOperationException; + + /** List tables by namespace and schema from external systems. */ + List listTables(@Nullable String namespace, @Nullable String schemaName); + + // TODO: Schema getTableSchema(TableID tableId); +}