Kafka Sink connector for BigQuery

테스트 환경

  1. 커넥터 라이브러리용 폴더 생성 및 라이브러리 복제

## create Connector DIR
mkdir /opt/kafka/connect-libs
 

## BigQuery Connector
wget http://domain/repo/confluent/wepay-kafka-connect-bigquery-2.5.7.zip
unzip wepay-kafka-connect-bigquery-2.5.7.zip
mv wepay-kafka-connect-bigquery-2.5.7 ./connect-libs
  1. 원본연결 설정파일 작성(connect-standalone-bq.properties**)**

## Kafka 서버 주소
bootstrap.servers=A:9092,B:9092


## Kafka 보안 설정에 따른 옵션 입력
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="id" password="pw";
 
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="id" password="pw";


## Topic 형식관련 지정, Json 으로 가정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter


## 스키마 관련 정보 설정
key.converter.schemas.enable=true
value.converter.schemas.enable=true


## Topic 관련 설정
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000


## Lib Path
plugin.path=/opt/kafka/libs,/opt/kafka/connect-libs/
  1. 입력 관련 설정 파일작성(connect-bq-sink.properties)
ame=bq-sink
 
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
 
tasks.max=1
 
topics=topics
 
project=test-project
defaultDataset=sink_test_dataset
 
keyfile=/opt/kafka/config/service_account.json

sanitizeTopics=true
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
 
autoCreateTables=true
autoUpdateSchemas=true
 
allowNewBigQueryFields=true
allowBigQueryRequiredFieldRelaxation=true
 
transforms=RegexTransformation
transforms.RegexTransformation.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexTransformation.regex=(kcbq_)(.*)
transforms.RegexTransformation.replacement=$2

batchSize=1000
retryInterval=10000
behaviorOnError=fail
  1. 실행
nohup bin/connect-standalone.sh config/connect-standalone-bq.properties config/connect-bq-deb-sink.properties >> bq-sink.log 2>&1 &

상기 순서로 진행하면, Standalone 방식으로 구성이 가능하다. 분산방식을 원한다면, 관련한 옵션을 추가해서 진행하면 된다.