diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Selectors.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Selectors.java new file mode 100644 index 000000000..ae7c0c79f --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Selectors.java @@ -0,0 +1,125 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.schema; + +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.utils.Predicates; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; + +/** Selectors for filtering tables. */ +public class Selectors { + + private List selectors; + + private Selectors() {} + + /** + * A {@link Selector} that determines whether a table identified by a given {@link TableId} is + * to be included. + */ + private static class Selector { + private final Predicate namespacePred; + private final Predicate schemaNamePred; + private final Predicate tableNamePred; + + public Selector(String namespace, String schemaName, String tableName) { + this.namespacePred = + namespace == null ? (namespacePred) -> false : Predicates.includes(namespace); + this.schemaNamePred = + schemaName == null + ? (schemaNamePred) -> false + : Predicates.includes(schemaName); + this.tableNamePred = + tableName == null ? (tableNamePred) -> false : Predicates.includes(tableName); + } + + public boolean isMatch(TableId tableId) { + + String namespace = tableId.getNamespace(); + String schemaName = tableId.getSchemaName(); + + if (namespace == null || namespace.isEmpty()) { + if (schemaName == null || schemaName.isEmpty()) { + return tableNamePred.test(tableId.getTableName()); + } + return schemaNamePred.test(tableId.getSchemaName()) + && tableNamePred.test(tableId.getTableName()); + } + return namespacePred.test(tableId.getNamespace()) + && schemaNamePred.test(tableId.getSchemaName()) + && tableNamePred.test(tableId.getTableName()); + } + } + + /** Match the {@link TableId} against the {@link Selector}s. * */ + public boolean isMatch(TableId tableId) { + for (Selector selector : selectors) { + if (selector.isMatch(tableId)) { + return true; + } + } + return false; + } + + /** Builder for {@link Selectors}. */ + public static class SelectorsBuilder { + + private List selectors; + + public SelectorsBuilder includeTables(String tableInclusions) { + + if (tableInclusions == null || tableInclusions.isEmpty()) { + throw new IllegalArgumentException( + "Invalid table inclusion pattern cannot be null or empty"); + } + + List selectors = new ArrayList<>(); + Set tableSplitSet = + Predicates.setOf( + tableInclusions, Predicates.RegExSplitterByComma::split, (str) -> str); + for (String tableSplit : tableSplitSet) { + Set tableIdSet = + Predicates.setOf( + tableSplit, Predicates.RegExSplitterByDot::split, (str) -> str); + Iterator iterator = tableIdSet.iterator(); + if (tableIdSet.size() == 1) { + selectors.add(new Selector(null, null, iterator.next())); + } else if (tableIdSet.size() == 2) { + selectors.add(new Selector(null, iterator.next(), iterator.next())); + } else if (tableIdSet.size() == 3) { + selectors.add(new Selector(iterator.next(), iterator.next(), iterator.next())); + } else { + throw new IllegalArgumentException( + "Invalid table inclusion pattern: " + tableInclusions); + } + } + this.selectors = selectors; + return this; + } + + public Selectors build() { + Selectors selectors = new Selectors(); + selectors.selectors = this.selectors; + return selectors; + } + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/TableFilter.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/TableFilter.java new file mode 100644 index 000000000..a5b64722c --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/TableFilter.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.schema; + +import com.ververica.cdc.common.event.TableId; + +import java.util.function.Predicate; + +/** A filter for tables. */ +@FunctionalInterface +public interface TableFilter { + + /** Determines whether the given table should be included. */ + boolean isIncluded(TableId tableId); + + /** Creates a {@link TableFilter} from the given predicate. */ + static TableFilter fromPredicate(Predicate predicate) { + return predicate::test; + } + + /** Creates a {@link TableFilter} that includes all tables. */ + static TableFilter includeAll() { + return t -> true; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/ParsingException.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/ParsingException.java new file mode 100644 index 000000000..e8808b6d0 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/ParsingException.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.text; + +/** An exception representing a problem during parsing of text. */ +public class ParsingException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private final Position position; + + /** @param position the position of the error; never null */ + public ParsingException(Position position) { + super(); + this.position = position; + } + + /** + * @param position the position of the error; never null + * @param message the message + * @param cause the underlying cause + */ + public ParsingException(Position position, String message, Throwable cause) { + super(message, cause); + this.position = position; + } + + /** + * @param position the position of the error; never null + * @param message the message + */ + public ParsingException(Position position, String message) { + super(message); + this.position = position; + } + + public Position getPosition() { + return position; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/Position.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/Position.java new file mode 100644 index 000000000..f701d2d4f --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/Position.java @@ -0,0 +1,106 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.text; + +/** + * A class that represents the position of a particular character in terms of the lines and columns + * of a character sequence. + */ +public final class Position { + + /** The position is used when there is no content. */ + public static final Position EMPTY_CONTENT_POSITION = new Position(-1, 1, 0); + + private final int line; + private final int column; + private final int indexInContent; + + public Position(int indexInContent, int line, int column) { + this.indexInContent = indexInContent < 0 ? -1 : indexInContent; + this.line = line; + this.column = column; + + assert this.line > 0; + assert this.column >= 0; + // make sure that negative index means an EMPTY_CONTENT_POSITION + assert this.indexInContent >= 0 || this.line == 1 && this.column == 0; + } + + /** + * Get the 0-based index of this position in the content character array. + * + * @return the index; never negative except for the first position in an empty content. + */ + public int index() { + return indexInContent; + } + + /** + * Get the 1-based column number of the character. + * + * @return the column number; always positive + */ + public int column() { + return column; + } + + /** + * Get the 1-based line number of the character. + * + * @return the line number; always positive + */ + public int line() { + return line; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override + public int hashCode() { + return indexInContent; + } + + @Override + public String toString() { + return "" + indexInContent + ':' + line + ':' + column; + } + + /** + * Return a new position that is the addition of this position and that supplied. + * + * @param position the position to add to this object; may not be null + * @return the combined position + */ + public Position add(Position position) { + if (this.index() < 0) { + return position.index() < 0 ? EMPTY_CONTENT_POSITION : position; + } + + if (position.index() < 0) { + return this; + } + + int index = this.index() + position.index(); + int line = position.line() + this.line() - 1; + int column = this.line() == 1 ? this.column() + position.column() : this.column(); + + return new Position(index, line, column); + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/TokenStream.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/TokenStream.java new file mode 100644 index 000000000..b93b9679d --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/text/TokenStream.java @@ -0,0 +1,720 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.text; + +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** A stream of tokens that can be consumed by a parser. This class is not thread-safe. */ +@NotThreadSafe +public class TokenStream { + + /** + * An opaque marker for a position within the token stream. + * + * @see TokenStream#mark() + */ + public static final class Marker implements Comparable { + private final int tokenIndex; + private final Position position; + + private Marker(Position position, int index) { + this.position = position; + this.tokenIndex = index; + } + + /** + * Get the position of this marker, or null if this is at the start or end of the token + * stream. + * + * @return the position. + */ + public Position position() { + return position; + } + + @Override + public int compareTo(Marker that) { + if (this == that) { + return 0; + } + return this.tokenIndex - that.tokenIndex; + } + + @Override + public String toString() { + return Integer.toString(tokenIndex); + } + } + + public static final String ANY_VALUE = "any value"; + public static final int ANY_TYPE = Integer.MIN_VALUE; + + protected final String inputString; + private final char[] inputContent; + private final boolean caseSensitive; + private final Tokenizer tokenizer; + private List tokens; + /** + * This class navigates the Token objects using this iterator. However, because it very often + * needs to access the "current token" in the "consume(...)" and "canConsume(...)" and + * "matches(...)" methods, the class caches a "current token" and makes this iterator point to + * the 2nd token. + * + *
+     *     T1     T2    T3    T4    T5
+     *         ˆ   ˆ  ˆ
+     *         |   |  |
+     *         |   |  +- The position of the tokenIterator, where tokenIterator.hasNext() will return T3
+     *         |   +---- The token referenced by currentToken
+     *         +-------- The logical position of the TokenStream object, where the "consume()" would return T2
+     * 
+ */ + private ListIterator tokenIterator; + + private Token currentToken; + private boolean completed; + + public TokenStream(String content, Tokenizer tokenizer, boolean caseSensitive) { + Objects.requireNonNull(content, "content"); + Objects.requireNonNull(tokenizer, "tokenizer"); + this.inputString = content; + this.inputContent = content.toCharArray(); + this.caseSensitive = caseSensitive; + this.tokenizer = tokenizer; + } + + /** + * Begin the token stream, including (if required) the tokenization of the input content. + * + * @return this object for easy method chaining; never null + * @throws ParsingException if an error occurs during tokenization of the content + */ + public TokenStream start() throws ParsingException { + // Create the tokens ... + if (tokens == null) { + TokenFactory tokenFactory = + caseSensitive + ? new CaseSensitiveTokenFactory() + : new CaseInsensitiveTokenFactory(); + CharacterStream characterStream = new CharacterArrayStream(inputContent); + tokenizer.tokenize(characterStream, tokenFactory); + this.tokens = initializeTokens(tokenFactory.getTokens()); + } + + // Create the iterator ... + tokenIterator = this.tokens.listIterator(); + moveToNextToken(); + return this; + } + + /** + * Method to allow subclasses to pre-process the set of tokens and return the correct tokens to + * use. The default behavior is to simply return the supplied tokens. + * + * @param tokens the tokens + * @return list of tokens. + */ + protected List initializeTokens(List tokens) { + return tokens; + } + + /** + * Obtain a marker that records the current position so that the stream can be {@link + * #rewind(Marker)} back to the mark even after having been advanced beyond the mark. + * + * @return the marker; never null + * @throws IllegalStateException if this method was called before the stream was {@link #start() + * started} + * @throws NoSuchElementException if there are no more tokens + */ + public Marker mark() { + if (completed) { + return new Marker(null, tokenIterator.previousIndex()); + } + Token currentToken = currentToken(); + Position currentPosition = currentToken != null ? currentToken.position() : null; + return new Marker(currentPosition, tokenIterator.previousIndex()); + } + + /** + * Reset the stream back to the position described by the supplied marker. This method does + * nothing if the mark is invalid. For example, it is not possible to advance the token stream + * beyond the current position. + * + * @param marker the marker + * @return true if the token stream was reset, or false if the marker was invalid + */ + public boolean rewind(Marker marker) { + if (marker.tokenIndex >= 0 && marker.tokenIndex <= this.tokenIterator.nextIndex()) { + completed = false; + currentToken = null; + tokenIterator = this.tokens.listIterator(marker.tokenIndex); + moveToNextToken(); + return true; + } + return false; + } + + /** + * Return the value of this token and move to the next token. + * + * @return the value of the current token + * @throws ParsingException if there is no such token to consume + * @throws IllegalStateException if this method was called before the stream was {@link #start() + * started} + */ + public String consume() throws ParsingException, IllegalStateException { + if (completed) { + throwNoMoreContent(); + } + // Get the value from the current token ... + String result = currentToken().value(); + moveToNextToken(); + return result; + } + + protected void throwNoMoreContent() throws ParsingException { + Position pos = + tokens.isEmpty() + ? new Position(-1, 1, 0) + : tokens.get(tokens.size() - 1).position(); + throw new ParsingException(pos, "No more content"); + } + + public String peek() throws IllegalStateException { + if (completed) { + throwNoMoreContent(); + } + // Get the value from the current token but do NOT advance ... + return currentToken().value(); + } + + /** + * Determine if the current token matches the expected value. + * + *

The {@link #ANY_VALUE ANY_VALUE} constant can be used as a wildcard. + * + * @param expected the expected value of the current token + * @return true if the current token did match, or false if the current token did not match + * @throws IllegalStateException if this method was called before the stream was {@link #start() + * started} + */ + public boolean matches(String expected) throws IllegalStateException { + return matches(ANY_TYPE, expected); + } + + /** + * Determine if the current token matches the expected type and a value. + * + *

The {@link #ANY_VALUE ANY_VALUE} constant can be used as a wildcard. + * + * @param type the expected type of the curent token + * @param expected the expected value of the current token + * @return true if the current token did match, or false if the current token did not match + * @throws IllegalStateException if this method was called before the stream was {@link #start() + * started} + */ + public boolean matches(int type, String expected) throws IllegalStateException { + return !completed + && (Objects.equals(expected, ANY_VALUE) || currentToken().matches(expected)) + && currentToken().matches(type); + } + + /** + * Determine if the next token matches one of the supplied values. + * + * @param options the options for the value of the current token + * @return true if the current token's value did match one of the supplied options, or false + * otherwise + * @throws IllegalStateException if this method was called before the stream was {@link #start() + * started} + */ + public boolean matchesAnyOf(String[] options) throws IllegalStateException { + if (completed) { + return false; + } + Token current = currentToken(); + for (String option : options) { + if (current.matches(option)) { + return true; + } + } + return false; + } + + /** + * Determine if this stream has another token to be consumed. + * + * @return true if there is another token ready for consumption, or false otherwise + * @throws IllegalStateException if this method was called before the stream was {@link #start() + * started} + */ + public boolean hasNext() { + if (tokenIterator == null) { + throw new IllegalStateException("start() method must be called before hasNext()"); + } + return !completed; + } + + @Override + public String toString() { + ListIterator iter = tokens.listIterator(tokenIterator.previousIndex()); + StringBuilder sb = new StringBuilder(); + if (iter.hasNext()) { + sb.append(iter.next()); + int count = 1; + while (iter.hasNext()) { + if (count > 20) { + sb.append(" ..."); + break; + } + sb.append(" "); + ++count; + sb.append(iter.next()); + } + } + return sb.toString(); + } + + private void moveToNextToken(List newTokens) { + if (newTokens != null && !newTokens.isEmpty()) { + for (Token t : newTokens) { + tokenIterator.add(t); + } + for (int i = 0; i < newTokens.size() - 1; i++) { + tokenIterator.previous(); + } + currentToken = newTokens.get(0); + return; + } + // And move the currentToken to the next token ... + if (!tokenIterator.hasNext()) { + completed = true; + currentToken = null; + } else { + currentToken = tokenIterator.next(); + } + } + + private void moveToNextToken() { + moveToNextToken(null); + } + + /** + * Get the current token. + * + * @return the current token; never null + * @throws IllegalStateException if this method was called before the stream was {@link #start() + * started} + * @throws NoSuchElementException if there are no more tokens + */ + final Token currentToken() throws IllegalStateException, NoSuchElementException { + if (currentToken == null) { + if (completed) { + throw new NoSuchElementException("No more content"); + } + throw new IllegalStateException( + "start() method must be called before consuming or matching"); + } + return currentToken; + } + + /** + * Interface for a Tokenizer component responsible for processing the characters in a {@link + * CharacterStream} and constructing the appropriate {@link Token} objects. + */ + public interface Tokenizer { + /** + * Process the supplied characters and construct the appropriate {@link Token} objects. + * + * @param input the character input stream; never null + * @param tokens the factory for {@link Token} objects, which records the order in which the + * tokens are created + * @throws ParsingException if there is an error while processing the character stream + * (e.g., a quote is not closed, etc.) + */ + void tokenize(CharacterStream input, Tokens tokens) throws ParsingException; + } + + /** + * Interface used by a {@link Tokenizer} to iterate through the characters in the content input + * to the {@link TokenStream}. + */ + public interface CharacterStream { + + /** + * Determine if there is another character available in this stream. + * + * @return true if there is another character (and {@link #next()} can be called), or false + * otherwise + */ + boolean hasNext(); + + /** + * Obtain the next character value, and advance the stream. + * + * @return the next character + * @throws NoSuchElementException if there is no {@link #hasNext() next character} + */ + char next(); + + /** + * Get the index for the last character returned from {@link #next()}. + * + * @return the index of the last character returned + */ + int index(); + + /** + * Get the position for the last character returned from {@link #next()}. + * + * @param startIndex the starting index + * @return the position of the last character returned; never null + */ + Position position(int startIndex); + } + + /** + * A factory for Token objects, used by a {@link Tokenizer} to create tokens in the correct + * order. + */ + public interface Tokens { + /** + * Create a single-character token at the supplied index in the character stream. The token + * type is set to 0, meaning this is equivalent to calling addToken(index,index+1) + * or addToken(index,index+1,0). + * + * @param position the position (line and column numbers) of this new token; may not be null + * @param index the index of the character to appear in the token; must be a valid index in + * the stream + */ + default void addToken(Position position, int index) { + addToken(position, index, index + 1, 0); + } + + /** + * Create a single- or multi-character token with the characters in the range given by the + * starting and ending index in the character stream. The character at the ending index is + * not included in the token (as this is standard practice when using 0-based + * indexes). The token type is set to 0, meaning this is equivalent to calling + * addToken(startIndex,endIndex,0) . + * + * @param position the position (line and column numbers) of this new token; may not be null + * @param startIndex the index of the first character to appear in the token; must be a + * valid index in the stream + * @param endIndex the index just past the last character to appear in the token; must be a + * valid index in the stream + */ + default void addToken(Position position, int startIndex, int endIndex) { + addToken(position, startIndex, endIndex, 0); + } + + /** + * Create a single- or multi-character token with the supplied type and with the characters + * in the range given by the starting and ending index in the character stream. The + * character at the ending index is not included in the token (as this is standard + * practice when using 0-based indexes). + * + * @param position the position (line and column numbers) of this new token; may not be null + * @param startIndex the index of the first character to appear in the token; must be a + * valid index in the stream + * @param endIndex the index just past the last character to appear in the token; must be a + * valid index in the stream + * @param type the type of the token + */ + void addToken(Position position, int startIndex, int endIndex, int type); + } + + /** + * The interface defining a token, which references the characters in the actual input character + * stream. + * + * @see CaseSensitiveTokenFactory + * @see CaseInsensitiveTokenFactory + */ + @Immutable + public interface Token { + /** + * Get the value of the token, in actual case. + * + * @return the value + */ + String value(); + + /** + * Determine if the token matches the supplied string. + * + * @param expected the expected value + * @return true if the token's value matches the supplied value, or false otherwise + */ + boolean matches(String expected); + + /** + * Determine if the token matches the supplied string and is of a requested type. + * + * @param expectedType the expected token type + * @param expected the expected value + * @return true if the token's type and value matches the supplied type and value, or false + * otherwise + */ + default boolean matches(int expectedType, String expected) { + return matches(expectedType) && matches(expected); + } + + /** + * Determine if the token matches the supplied character. + * + * @param expected the expected character value + * @return true if the token's value matches the supplied character value, or false + * otherwise + */ + boolean matches(char expected); + + /** + * Determine if the token matches the supplied type. + * + * @param expectedType the expected integer type + * @return true if the token's value matches the supplied integer type, or false otherwise + */ + boolean matches(int expectedType); + + /** + * Get the type of the token. + * + * @return the token's type + */ + int type(); + + /** + * Get the index in the raw stream for the first character in the token. + * + * @return the starting index of the token + */ + int startIndex(); + + /** + * Get the index in the raw stream past the last character in the token. + * + * @return the ending index of the token, which is past the last character + */ + int endIndex(); + + /** + * Get the length of the token, which is equivalent to endIndex() - startIndex() + * . + * + * @return the length + */ + int length(); + + /** + * Get the position of this token, which includes the line number and column number of the + * first character in the token. + * + * @return the position; never null + */ + Position position(); + + /** + * Bitmask ORed with existing type value. + * + * @param typeMask the mask of types + * @return copy of Token with new type + */ + Token withType(int typeMask); + } + + /** An immutable {@link Token} that implements matching using case-sensitive logic. */ + @Immutable + protected class CaseSensitiveToken implements Token { + private final int startIndex; + private final int endIndex; + private final int type; + private final Position position; + + public CaseSensitiveToken(int startIndex, int endIndex, int type, Position position) { + this.startIndex = startIndex; + this.endIndex = endIndex; + this.type = type; + this.position = position; + } + + @Override + public Token withType(int typeMask) { + int type = this.type | typeMask; + return new CaseSensitiveToken(startIndex, endIndex, type, position); + } + + @Override + public final int type() { + return type; + } + + @Override + public final int startIndex() { + return startIndex; + } + + @Override + public final int endIndex() { + return endIndex; + } + + @Override + public final int length() { + return endIndex - startIndex; + } + + @Override + public final boolean matches(char expected) { + return length() == 1 && matchString().charAt(startIndex) == expected; + } + + @Override + public boolean matches(String expected) { + return matchString().substring(startIndex, endIndex).equals(expected); + } + + @Override + public final boolean matches(int expectedType) { + return expectedType == ANY_TYPE + || (currentToken().type() & expectedType) == expectedType; + } + + @Override + public final String value() { + return inputString.substring(startIndex, endIndex); + } + + @Override + public Position position() { + return position; + } + + protected String matchString() { + return inputString; + } + + @Override + public String toString() { + return value(); + } + } + + /** An immutable {@link Token} that implements matching using case-insensitive logic. */ + @Immutable + protected class CaseInsensitiveToken extends CaseSensitiveToken { + public CaseInsensitiveToken(int startIndex, int endIndex, int type, Position position) { + super(startIndex, endIndex, type, position); + } + + @Override + public boolean matches(String expected) { + return matchString().substring(startIndex(), endIndex()).toUpperCase().equals(expected); + } + + @Override + public Token withType(int typeMask) { + int type = this.type() | typeMask; + return new CaseInsensitiveToken(startIndex(), endIndex(), type, position()); + } + } + + /** An implementation of {@link Tokens} that creates {@link CaseSensitiveToken} objects. */ + protected abstract class TokenFactory implements Tokens { + protected final List tokens = new ArrayList(); + + public List getTokens() { + return tokens; + } + } + + /** An implementation of {@link Tokens} that creates {@link CaseSensitiveToken} objects. */ + public class CaseSensitiveTokenFactory extends TokenFactory { + @Override + public void addToken(Position position, int startIndex, int endIndex, int type) { + tokens.add(new CaseSensitiveToken(startIndex, endIndex, type, position)); + } + } + + /** An implementation of {@link Tokens} that creates {@link CaseInsensitiveToken} objects. */ + public class CaseInsensitiveTokenFactory extends TokenFactory { + @Override + public void addToken(Position position, int startIndex, int endIndex, int type) { + tokens.add(new CaseInsensitiveToken(startIndex, endIndex, type, position)); + } + } + + /** An implementation of {@link CharacterStream} that works with a single character array. */ + public static final class CharacterArrayStream implements CharacterStream { + private final char[] content; + private int lastIndex = -1; + private final int maxIndex; + private int lineNumber = 1; + private int columnNumber = 0; + private boolean nextCharMayBeLineFeed; + + public CharacterArrayStream(char[] content) { + this.content = content; + this.maxIndex = content.length - 1; + } + + @Override + public boolean hasNext() { + return lastIndex < maxIndex; + } + + @Override + public int index() { + return lastIndex; + } + + @Override + public Position position(int startIndex) { + return new Position(startIndex, lineNumber, columnNumber); + } + + @Override + public char next() { + if (lastIndex >= maxIndex) { + throw new NoSuchElementException(); + } + char result = content[++lastIndex]; + ++columnNumber; + if (result == '\r') { + nextCharMayBeLineFeed = true; + ++lineNumber; + columnNumber = 0; + } else if (result == '\n') { + if (!nextCharMayBeLineFeed) { + ++lineNumber; + } + columnNumber = 0; + } else if (nextCharMayBeLineFeed) { + nextCharMayBeLineFeed = false; + } + return result; + } + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/Predicates.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/Predicates.java new file mode 100644 index 000000000..773f10888 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/Predicates.java @@ -0,0 +1,188 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.utils; + +import com.ververica.cdc.common.text.ParsingException; +import com.ververica.cdc.common.text.TokenStream; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** A utility class for creating {@link Predicate} instances. */ +public class Predicates { + + public static Predicate includes(String regexPatterns, Function conversion) { + Set patterns = setOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE); + return includedInPatterns(patterns, conversion); + } + + protected static Predicate includedInPatterns( + Collection patterns, Function conversion) { + return (t) -> matchedByPattern(patterns, conversion).apply(t).isPresent(); + } + + /** + * Generate a predicate function that for any supplied string returns {@code true} if any + * of the regular expressions in the supplied comma-separated list matches the predicate + * parameter. + * + * @param regexPatterns the comma-separated regular expression pattern (or literal) strings; may + * not be null + * @return the predicate function that performs the matching + * @throws PatternSyntaxException if the string includes an invalid regular expression + */ + public static Predicate includes(String regexPatterns) { + return includes(regexPatterns, (str) -> str); + } + + public static Set setOfRegex(String input, int regexFlags) { + return setOf(input, RegExSplitterByComma::split, (str) -> Pattern.compile(str, regexFlags)); + } + + public static Set setOf( + String input, Function splitter, Function factory) { + if (input == null) { + return Collections.emptySet(); + } + Set matches = new LinkedHashSet<>(); + for (String item : splitter.apply(input)) { + T obj = factory.apply(item); + if (obj != null) { + matches.add(obj); + } + } + return matches; + } + + protected static Function> matchedByPattern( + Collection patterns, Function conversion) { + return (t) -> { + String str = conversion.apply(t); + if (str != null) { + for (Pattern p : patterns) { + if (p.matcher(str).matches()) { + return Optional.of(p); + } + } + } + return Optional.empty(); + }; + } + + /** + * A tokenization class used to split a comma-separated list of regular expressions. If a comma + * is part of expression then it can be prepended with '\' so it will not act as a + * separator. + */ + public static class RegExSplitterByComma implements TokenStream.Tokenizer { + + public static String[] split(String identifier) { + TokenStream stream = new TokenStream(identifier, new RegExSplitterByComma(), true); + stream.start(); + + List parts = new ArrayList<>(); + + while (stream.hasNext()) { + final String part = stream.consume(); + if (part.isEmpty()) { + continue; + } + parts.add(part.trim().replace("\\,", ",")); + } + + return parts.toArray(new String[0]); + } + + @Override + public void tokenize(TokenStream.CharacterStream input, TokenStream.Tokens tokens) + throws ParsingException { + int tokenStart = 0; + while (input.hasNext()) { + char c = input.next(); + // Escape sequence + if (c == '\\') { + if (!input.hasNext()) { + throw new ParsingException( + input.position(input.index()), + "Unterminated escape sequence at the end of the string"); + } + input.next(); + } else if (c == ',') { + tokens.addToken(input.position(tokenStart), tokenStart, input.index()); + tokenStart = input.index() + 1; + } + } + tokens.addToken(input.position(tokenStart), tokenStart, input.index() + 1); + } + } + + /** + * A tokenization class used to split a dot-separated list of regular expressions. If a comma is + * part of expression then it can be prepended with '\' so it will not act as a + * separator. + */ + public static class RegExSplitterByDot implements TokenStream.Tokenizer { + + public static String[] split(String identifier) { + TokenStream stream = new TokenStream(identifier, new RegExSplitterByDot(), true); + stream.start(); + + List parts = new ArrayList<>(); + + while (stream.hasNext()) { + final String part = stream.consume(); + if (part.isEmpty()) { + continue; + } + parts.add(part.trim().replace("\\.", ".")); + } + + return parts.toArray(new String[0]); + } + + @Override + public void tokenize(TokenStream.CharacterStream input, TokenStream.Tokens tokens) + throws ParsingException { + int tokenStart = 0; + while (input.hasNext()) { + char c = input.next(); + // Escape sequence + if (c == '\\') { + if (!input.hasNext()) { + throw new ParsingException( + input.position(input.index()), + "Unterminated escape sequence at the end of the string"); + } + input.next(); + } else if (c == '.') { + tokens.addToken(input.position(tokenStart), tokenStart, input.index()); + tokenStart = input.index() + 1; + } + } + tokens.addToken(input.position(tokenStart), tokenStart, input.index() + 1); + } + } +} diff --git a/flink-cdc-common/src/test/java/com/ververica/cdc/common/schema/SelectorsTest.java b/flink-cdc-common/src/test/java/com/ververica/cdc/common/schema/SelectorsTest.java new file mode 100644 index 000000000..aa10143ed --- /dev/null +++ b/flink-cdc-common/src/test/java/com/ververica/cdc/common/schema/SelectorsTest.java @@ -0,0 +1,137 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.schema; + +import com.ververica.cdc.common.event.TableId; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link Selectors}. */ +class SelectorsTest { + + @Test + public void testTableSelector() { + + // nameSpace, schemaName, tableName + Selectors selectors = + new Selectors.SelectorsBuilder() + .includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+") + .build(); + + assertAllowed(selectors, "db", "sc1", "A1"); + assertAllowed(selectors, "db", "sc1", "A2"); + assertAllowed(selectors, "db", "sc2", "B0"); + assertAllowed(selectors, "db", "sc2", "B1"); + assertNotAllowed(selectors, "db", "sc1", "A"); + assertNotAllowed(selectors, "db", "sc1a", "B"); + assertNotAllowed(selectors, "db", "sc1", "AA"); + assertNotAllowed(selectors, "db", "sc2", "B2"); + assertNotAllowed(selectors, "db2", "sc1", "A1"); + assertNotAllowed(selectors, "db2", "sc1", "A2"); + assertNotAllowed(selectors, "db", "sc11", "A1"); + assertNotAllowed(selectors, "db", "sc1A", "A1"); + + selectors = + new Selectors.SelectorsBuilder() + .includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+") + .build(); + + assertAllowed(selectors, "db1", "sc1", "A1"); + assertAllowed(selectors, "dba", "sc1", "A2"); + assertAllowed(selectors, "db", "sc2", "B0"); + assertAllowed(selectors, "db", "sc2", "B1"); + assertNotAllowed(selectors, "db", "sc1", "A"); + assertNotAllowed(selectors, "db", "sc1a", "B"); + assertNotAllowed(selectors, "db", "sc1", "AA"); + assertNotAllowed(selectors, "db", "sc2", "B2"); + assertNotAllowed(selectors, "dba1", "sc1", "A1"); + assertNotAllowed(selectors, "dba2", "sc1", "A2"); + assertNotAllowed(selectors, "db", "sc11", "A1"); + assertNotAllowed(selectors, "db", "sc1A", "A1"); + + // schemaName, tableName + selectors = + new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build(); + + assertAllowed(selectors, null, "sc1", "A1"); + assertAllowed(selectors, null, "sc1", "A2"); + assertAllowed(selectors, null, "sc2", "B0"); + assertAllowed(selectors, null, "sc2", "B1"); + assertNotAllowed(selectors, "db", "sc1", "A1"); + assertNotAllowed(selectors, null, "sc1", "A"); + assertNotAllowed(selectors, null, "sc2", "B"); + assertNotAllowed(selectors, null, "sc1", "AA"); + assertNotAllowed(selectors, null, "sc11", "A1"); + assertNotAllowed(selectors, null, "sc1A", "A1"); + + // tableName + selectors = new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+").build(); + + assertAllowed(selectors, null, null, "1A1"); + assertAllowed(selectors, null, null, "AA2"); + assertAllowed(selectors, null, null, "B0"); + assertAllowed(selectors, null, null, "B1"); + assertNotAllowed(selectors, "db", "sc1", "A1"); + assertNotAllowed(selectors, null, null, "A"); + assertNotAllowed(selectors, null, null, "B"); + assertNotAllowed(selectors, null, null, "2B"); + + selectors = + new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build(); + + assertAllowed(selectors, null, "sc1", "A1"); + assertAllowed(selectors, null, "sc1", "A2"); + assertAllowed(selectors, null, "sc1", "A2"); + assertAllowed(selectors, null, "sc2", "B0"); + assertNotAllowed(selectors, "db", "sc1", "A1"); + assertNotAllowed(selectors, null, "sc1", "A"); + assertNotAllowed(selectors, null, "sc1", "AA"); + assertNotAllowed(selectors, null, "sc2", "B"); + assertNotAllowed(selectors, null, "sc2", "B2"); + assertNotAllowed(selectors, null, "sc11", "A1"); + assertNotAllowed(selectors, null, "sc1A", "A1"); + } + + protected void assertAllowed( + Selectors filter, String nameSpace, String schemaName, String tableName) { + + TableId id = getTableId(nameSpace, schemaName, tableName); + + assertThat(filter.isMatch(id)).isTrue(); + } + + protected void assertNotAllowed( + Selectors filter, String nameSpace, String schemaName, String tableName) { + + TableId id = getTableId(nameSpace, schemaName, tableName); + + assertThat(filter.isMatch(id)).isFalse(); + } + + private static TableId getTableId(String nameSpace, String schemaName, String tableName) { + TableId id; + if (nameSpace == null && schemaName == null) { + id = TableId.tableId(tableName); + } else if (nameSpace == null) { + id = TableId.tableId(schemaName, tableName); + } else { + id = TableId.tableId(nameSpace, schemaName, tableName); + } + return id; + } +}