feat: kafka启用krb5认证
This commit is contained in:
@@ -20,11 +20,18 @@ func InitConfig() {
|
||||
for _, v := range addrsArr {
|
||||
addrs = append(addrs, v.(string))
|
||||
}
|
||||
|
||||
// 实例
|
||||
k := Kafka{
|
||||
Addrs: addrs,
|
||||
}
|
||||
k.NewConfig()
|
||||
k.Config.Net.SASL.Enable = false
|
||||
|
||||
// 是否启用krb5认证
|
||||
krb5Enable := config.Get("nmsCXY.kafka.krb5.enable").(bool)
|
||||
if krb5Enable {
|
||||
k.NewKerberosConfig()
|
||||
}
|
||||
KInitConm = k
|
||||
}
|
||||
|
||||
@@ -34,6 +41,7 @@ type Kafka struct {
|
||||
Config *sarama.Config
|
||||
}
|
||||
|
||||
// NewConfig 默认基础配置
|
||||
func (k *Kafka) NewConfig() {
|
||||
// 设置Kafka配置
|
||||
config := sarama.NewConfig()
|
||||
@@ -45,6 +53,35 @@ func (k *Kafka) NewConfig() {
|
||||
k.Config = config
|
||||
}
|
||||
|
||||
// NewKerberosConfig 认证krb5登录
|
||||
func (k *Kafka) NewKerberosConfig() {
|
||||
configPath := config.Get("nmsCXY.kafka.krb5.config.configPath").(string) // /path/to/krb5.conf
|
||||
keyTabPath := config.Get("nmsCXY.kafka.krb5.config.keyTabPath").(string) // /path/to/keytab
|
||||
serviceName := config.Get("nmsCXY.kafka.krb5.config.serviceName").(string) // kafka
|
||||
realm := config.Get("nmsCXY.kafka.krb5.config.realm").(string) // EXAMPLE.COM
|
||||
username := config.Get("nmsCXY.kafka.krb5.config.username").(string) // client
|
||||
// krb5Enable := config.Get("nmsCXY.kafka.krb5.enable").(bool)
|
||||
k.Config.Net.SASL.Enable = true
|
||||
k.Config.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
|
||||
k.Config.Net.SASL.GSSAPI = sarama.GSSAPIConfig{
|
||||
AuthType: sarama.KRB5_KEYTAB_AUTH,
|
||||
KerberosConfigPath: configPath,
|
||||
KeyTabPath: keyTabPath,
|
||||
ServiceName: serviceName,
|
||||
Realm: realm,
|
||||
Username: username,
|
||||
}
|
||||
|
||||
client, err := sarama.NewKerberosClient(&k.Config.Net.SASL.GSSAPI)
|
||||
if err != nil {
|
||||
logger.Fatalf("kafka kerberos NewKerberosClient err %v", err)
|
||||
}
|
||||
err = client.Login()
|
||||
if err != nil {
|
||||
logger.Fatalf("kafka kerberos Login err %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// MessageSyncSend 消息同步发送
|
||||
func (k *Kafka) MessageSyncSend(topic string, partition int32, msg string) (int32, int64, error) {
|
||||
// 创建Kafka生产者
|
||||
|
||||
Reference in New Issue
Block a user