테스트 환경
- Ubuntu 22.04
- JAVA 17
- Kafka 2.12 - 3.15
- BQ Sink Connector : wepay-kafka-connect-bigquery-2.5.7 (https://www.confluent.io/hub/wepay/kafka-connect-bigquery)
- 설치경로: /opt/kafka
- 커넥터 라이브러리용 폴더 생성 및 라이브러리 복제
## create Connector DIR
mkdir /opt/kafka/connect-libs
## BigQuery Connector
wget http://domain/repo/confluent/wepay-kafka-connect-bigquery-2.5.7.zip
unzip wepay-kafka-connect-bigquery-2.5.7.zip
mv wepay-kafka-connect-bigquery-2.5.7 ./connect-libs
- 원본연결 설정파일 작성(connect-standalone-bq.properties**)**
## Kafka 서버 주소
bootstrap.servers=A:9092,B:9092
## Kafka 보안 설정에 따른 옵션 입력
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="id" password="pw";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="id" password="pw";
## Topic 형식관련 지정, Json 으로 가정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
## 스키마 관련 정보 설정
key.converter.schemas.enable=true
value.converter.schemas.enable=true
## Topic 관련 설정
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
## Lib Path
plugin.path=/opt/kafka/libs,/opt/kafka/connect-libs/
- 입력 관련 설정 파일작성(connect-bq-sink.properties)
ame=bq-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=topics
project=test-project
defaultDataset=sink_test_dataset
keyfile=/opt/kafka/config/service_account.json
sanitizeTopics=true
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
autoCreateTables=true
autoUpdateSchemas=true
allowNewBigQueryFields=true
allowBigQueryRequiredFieldRelaxation=true
transforms=RegexTransformation
transforms.RegexTransformation.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexTransformation.regex=(kcbq_)(.*)
transforms.RegexTransformation.replacement=$2
batchSize=1000
retryInterval=10000
behaviorOnError=fail
- 실행
nohup bin/connect-standalone.sh config/connect-standalone-bq.properties config/connect-bq-deb-sink.properties >> bq-sink.log 2>&1 &
상기 순서로 진행하면, Standalone 방식으로 구성이 가능하다. 분산방식을 원한다면, 관련한 옵션을 추가해서 진행하면 된다.