filebeat对接kafka
⼯作中遇到了,filebeat对接kafka,记下来,分享⼀下,也为了防⽌⾃⼰忘记
对于filebeat是运⾏在客户端的⼀个收集⽇志的agent,filebeat是⼀个⽿朵进⼀个⽿朵出,进的意思是监听哪个⽇志⽂件,出的意思是监听的⽇志内容输出到哪⾥去,当然,这⾥我们输出到
kafka消息队列中,⽽kafka就是⼀个消息队列,为什么要⽤kafka?因为现在⽤的很多,⽽且⼯作中也确实遇到filebeat对接kafka了。具体的可以⾃⾏百度查询,废话不多说,开始做
第⼀步,安装helm3
第⼆步,加载helm仓库,本来是需要加载官⽹的仓库地址的,可惜,翻不了墙,⽤阿⾥的代理⼀下吧,也能⽤,亲测
第三步,下载helm⽂件,解压出来就是filebeat⽬录
helm pull apphub/kafka
helm pull apphub/filebeat
第四步,应⽤kafka⽂件内容,红⾊字体⾮常有⽤,接下来给你们解释⼀下
[root@VM-0-15-centos ~]# helm install kafka2 ./kafka
NAME: kafka2
LAST DEPLOYED: Fri Feb  522:57:452021
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
** Please be patient while the chart is being deployed **
Kafka can be accessed via port 9092 on the following DNS name from within your cluster:
kafka2.default.svc.cluster.local #在k8s这个内容可以当做域名来解析出来kafka的ip地址
To create a topic run the following command:#创建主题
export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")    kubectl --namespace default exec -it $POD_NAME -- kafka-topics.sh --create --zookeeper kafka2-zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
To list all the topics run the following command:#查看所有的主题
export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")    kubectl --namespace default exec -it $POD_NAME -- kafka-topics.sh --list --zookeeper kafka2-zookeeper:2181
To start a kafka producer run the following command:#进⼊kafka⽣产者命令⾏可以给主题添加消息
export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")    kubectl --namespace default exec -it $POD_NAME -- kafka-console-producer.sh --broker-list localhost:9092 --topic test
To start a kafka consumer run the following command:#消费者窗⼝,可以查看到⽣产者发出的信息
export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")    kubectl --namespace default exec -it $POD_NAME -- kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
#以下两⾏是在容器内部运⾏的命令,⼀个是⽣产者,⼀个是消费者
PRODUCER:
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
CONSUMER:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
接下来我们来安装filebeat
filebeat有两种⽅式:
  1.以sidecar⽅式去⼿机容器⽇志,也就是说,⼀个pod中必须要运⾏⼀个filebeat容器,这样的话,如果有1000个pod,每⼀个pod跑⼀个应⽤,⼀个filebeat,那么就是2000个,果断放弃
  2.以daemonSet⽅式,以节点运⾏,那么只需要有⼏个node就运⾏⼏个filebeat就可以了,所以我么选择第⼆种
第⼀步,修改l⽂件,如下,红⾊字体需要注意,filebeat.input是监听的⽂件,output.kafka是输出到哪⾥去,我们这⾥配的是域名,coredns会⾃动解析成ip,具体规则是
pod名称.名称空间.svc.cluster.local,topic为我们创建的主题名称
[root@VM-0-15-centos filebeat]# cat values.yaml | grep -v "#" | grep -v "^$"
image:
repository: /beats/filebeat-oss
tag: 7.4.0
pullPolicy: IfNotPresent
config:
modules:
path: ${fig}/modules.d/*.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/a.log
output.kafka:
enabled: true
hosts: ["kafka.default.svc.cluster.local:9092"]
topic: test1111
http.port: 5066
overrideConfig: {}
data:
hostPath: /var/lib/filebeat
indexTemplateLoad: []
plugins: []
command: []
args: []
extraVars: []
extraVolumes: []
extraVolumeMounts: []
extraSecrets: {}
extraInitContainers: []
resources: {}
priorityClassName: ""
nodeSelector: {}
annotations: {}
tolerations: []
affinity: {}
rbac:
create: true
serviceAccount:
create: true
name:
podSecurityPolicy:
enabled: False
annotations: {}
privileged: false
monitoring:
enabled: true
serviceMonitor:
enabled: true
image:
repository: trustpilot/beat-exporter
tag: 0.1.1
pullPolicy: IfNotPresent
resources: {}
args: []
exporterPort: 9479
targetPort: 9479
第⼆步,应⽤filebeat
[root@VM-0-15-centos ~]# helm install filebeat2 ./filebeat
NAME: filebeat2
LAST DEPLOYED: Fri Feb  523:09:512021
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
To verify that Filebeat has started, run:
kubectl --namespace=default get pods -l "app=filebeat,release=filebeat2"第三步,我们查看⼀下所有的pod,zookeeper是kafka集必带的
第四步,测试
进⼊到容器,然后在/var/a.log这个⽂件内输⼊点东西
第五步,查看另⼀边监听的
补充
如果要监控另外容器⽇志,那么,我们可以把这个应⽤的容器⽇志输出到宿主机的⽬录下⾯,然后再在filebeat容器跟这个宿主机⽬录做映射,在配置l⽂件,来完成filebeat对应⽤容器产⽣的⽇志做监控。最主要就是配置l这个⽂件,此⽂件内容如下:
[root@iZ8vb1m9mvb3ev1tqgrldwZ shell]# cat filebeat/values.yaml | grep -v "#" | grep -v "^$"
image:
repository: /beats/filebeat-oss
tag: 7.4.0
pullPolicy: IfNotPresent
config:
modules:
path: ${fig}/modules.d/*.yml
processors:
- add_cloud_metadata:
filebeat.inputs:
- type: log
enabled: true
paths:
- /host/var/melocal/logs/*.log
- /host/var/geo/logs/*.log
kafka命令- /host/var/rgeo/log/*.log
output.kafka:
enabled: true
hosts: ["kafka.default.svc.cluster.local:9092"]
topic: test_topic
http.port: 5066
overrideConfig: {}
data:
hostPath: /var/lib/filebeat
indexTemplateLoad: []
plugins: []
command: []
args: []
extraVars: []
extraVolumes:
- hostPath:
path: /root/jiaohang/amap-melocal/logs
name: melocal-log
- hostPath:
path: /root/jiaohang/amap-geo/data/geocoding/log
name: geo-log
- hostPath:
path: /root/jiaohang/amap-rgeo/data/reverse_geocoding/log
name: rgeo-log
extraVolumeMounts:
- name: melocal-log
mountPath: /host/var/melocal/logs
readOnly: true
- name: geo-log
mountPath: /host/var/geo/log
readOnly: true
- name: rgeo-log
mountPath: /host/var/rgeo/log
readOnly: true
extraSecrets: {}
extraInitContainers: []
resources: {}
priorityClassName: ""
nodeSelector: {}
annotations: {}
tolerations: []
affinity: {}
rbac:
create: true
serviceAccount:
create: true
name:
podSecurityPolicy:
enabled: False
annotations: {}
privileged: false
monitoring:
enabled: true
serviceMonitor:
enabled: true
image:
repository: trustpilot/beat-exporter
tag: 0.1.1
pullPolicy: IfNotPresent
resources: {}
args: []
exporterPort: 9479
targetPort: 9479
我在这⾥监听了3个⽬录下⾯的⽇志动态,配置成功后在kafka下⾯就会有⽇志输出了