Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Kafka role for 0.10.x #66

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions roles/kafka/defaults/main.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---

kafka_scala_version: 2.11
kafka_version: 0.9.0.1
kafka_version: 0.10.2.1

kafka_install_dir: /opt/kafka
kafka_config_dir: /opt/kafka/default/config
Expand All @@ -19,15 +19,14 @@ kafka_group: kafka
kafka_port: 9092
kafka_jmx_port: 9999
kafka_message_max: 10000000
kafka_replica_fetch_max_bytes: 15000000
kafka_consumer_message_max: 16777216
kafka_num_partitions: "{{ groups['kafka-nodes'] | length }}"
kafka_replication_factor: "{{ groups['kafka-nodes'] | length }}"
kafka_num_partitions: 3
kafka_replication_factor: 3
kafka_log_retention_hours: 168
kafka_offsets_topic_num_partitions: 50
kafka_offsets_topic_replication_factor: 3
kafka_offsets_topic_segmant_bytes: 104857600
kafka_num_network_threads: 8
kafka_num_io_threads: 8
kafka_num_replica_fetchers: 6
kafka_zookeeper_connection_timeout: 6000
kafka_heap: 1G

repository_infrastructure: "{{ apache_mirror }}/kafka/{{ kafka_version }}"
3 changes: 0 additions & 3 deletions roles/kafka/handlers/main.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
---
- name: restart kafka
command: supervisorctl update kafka
# supervisorctl:
# name=kafka
# state=restarted
4 changes: 0 additions & 4 deletions roles/kafka/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@
src=consumer.properties.j2
dest={{ kafka_config_dir }}/consumer.properties
mode=0644
notify:
- restart kafka
tags: kafka

- name: copy supervisord config
Expand Down Expand Up @@ -114,5 +112,3 @@
- "{{ kafka_data_dir }}"
- "{{ kafka_log_dir }}"
tags: kafka

# - cron: name="clear old kafka app logs" job="find /opt/kafka/default/logs -mtime +7 -exec rm {} \; > /dev/null" minute="0"
2 changes: 1 addition & 1 deletion roles/kafka/templates/consumer.properties.j2
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000

# Need to increase this to play nice with message.max.bytes = 10000000
# Need to increase this to play nice with broker message.max.bytes
fetch.message.max.bytes={{ kafka_consumer_message_max }}
32 changes: 24 additions & 8 deletions roles/kafka/templates/log4j.properties.j2
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,66 @@
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO, stdout
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File={{ kafka_log_dir }}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }}
log4j.appender.kafkaAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }}

log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File={{ kafka_log_dir }}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stateChangeAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }}
log4j.appender.stateChangeAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }}

log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File={{ kafka_log_dir }}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.requestAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }}
log4j.appender.requestAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }}

log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File={{ kafka_log_dir }}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }}
log4j.appender.cleanerAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }}

log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File={{ kafka_log_dir }}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }}
log4j.appender.controllerAppender.MaxFileSize=20MB
log4j.appender.controllerAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }}

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File={{ kafka_log_dir }}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.authorizerAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }}
log4j.appender.authorizerAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }}

# Turn on all our debugging info
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
#log4j.logger.kafka.perf=DEBUG, kafkaAppender
#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG

log4j.logger.kafka=INFO, kafkaAppender

log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
Expand All @@ -80,3 +92,7 @@ log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

#Change this to debug to get the actual audit log for authorizer.
log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
103 changes: 53 additions & 50 deletions roles/kafka/templates/server.properties.j2
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################
Expand All @@ -21,47 +22,61 @@
{%- if host == inventory_hostname -%}broker.id={{ loop.index }}{%- endif -%}
{% endfor %}

# The maximum message size the broker can receive (10mb as of 8/28/14)
message.max.bytes={{ kafka_message_max }}
replica.fetch.max.bytes={{ kafka_replica_fetch_max_bytes }}

# Can delete topics in Kafka 0.8.2.0
{% if kafka_inter_broker_protocol_version is defined and kafka_inter_broker_protocol_version %}
# Specify which version of the inter-broker protocol will be used. This is
# typically bumped after all brokers were upgraded to a new version. Example
# of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1,
# 0.9.0.0, 0.9.0.1, 0.10.1. Check ApiVersion for the full list.
inter.broker.protocol.version={{ kafka_inter_broker_protocol_version }}
{% endif %}

{% if kafka_log_message_format_version is defined and kafka_log_message_format_version %}
# Specify the message format version the broker will use to append messages to the
# logs. The value should be a valid ApiVersion. Some examples are: 0.8.2, 0.9.0.0,
# 0.10.0, 0.10.1, check ApiVersion for more details. By setting a particular message format
# version, the user is certifying that all the existing messages on disk are smaller
# or equal than the specified version. Setting this value incorrectly will cause
# consumers with older versions to break as they will receive messages with a format
# that they don't understand.
log.message.format.version={{ kafka_log_message_format_version }}
{% endif %}

# The maximum message size the broker can receive
message.max.bytes={{ kafka_message_max|default(1000012) }}

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true

############################# Socket Server Settings #############################

# The port the socket server listens on
port={{ kafka_port }}

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
# host.name={{ inventory_hostname }}
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://{{ inventory_hostname }}:{{ kafka_port|default(9092) }}

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name={{ inventory_hostname }}

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port={{ kafka_port }}
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://{{ inventory_hostname }}:{{ kafka_port|default(9092) }}

# The number of threads handling network requests
# Apache Docs recommend setting to this 8 in production
num.network.threads=8
num.network.threads={{ kafka_num_network_threads|default(3) }}

# The number of threads doing disk I/O
num.io.threads={{ kafka_num_io_threads }}
num.io.threads={{ kafka_num_io_threads|default(8) }}

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
Expand All @@ -70,13 +85,18 @@ log.dirs={% for dir in kafka_data_log_dir %}{{dir}},{% endfor %}
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions={{ kafka_num_partitions }}
num.partitions={{ kafka_num_partitions|default(1) }}

# Add redundancy across brokers
default.replication.factor={{ kafka_replication_factor }}
# Default replication factors for automatically created topics
default.replication.factor={{ kafka_replication_factor|default(1) }}

# Increase IO between replication brokers
num.replica.fetchers=4
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

# Number of fetcher threads used to replicate messages from a source broker. Increasing this
# value can increase the degree of I/O parallelism in the follower broker.
num.replica.fetchers={{ kafka_num_replica_fetchers|default(1) }}

############################# Log Flush Policy #############################

Expand All @@ -103,35 +123,18 @@ num.replica.fetchers=4
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours={{ kafka_log_retention_hours }}
log.retention.hours={{ kafka_log_retention_hours|default(168) }}

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
# Always keep enabled now: https://issues.apache.org/jira/browse/KAFKA-3000
log.cleaner.enable=true

# The number of partitions for the offset commit topic (should not change after deployment)
offsets.topic.num.partitions={{ kafka_offsets_topic_num_partitions|default(50) }}

# The replication factor for the offsets topic (set higher to ensure availability). To ensure that the effective replication
# factor of the offsets topic is the configured value, the number of alive brokers has to be at least the replication factor
# at the time of the first request for the offsets topic. If not, either the offsets topic creation will fail or it will get
# a replication factor of min(alive brokers, configured replication factor)
offsets.topic.replication.factor={{ kafka_offsets_topic_replication_factor|default(3) }}

# The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads
offsets.topic.segment.bytes={{ kafka_offsets_topic_segmant_bytes|default(104857600) }}
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

Expand All @@ -143,4 +146,4 @@ offsets.topic.segment.bytes={{ kafka_offsets_topic_segmant_bytes|default(1048576
zookeeper.connect={% for host in zookeeper_host_list %}{{ host }}:{{ zookeeper_client_port|default(2181) }}{% if not loop.last %},{% endif %}{% endfor %}

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
zookeeper.connection.timeout.ms={{ kafka_zookeeper_connection_timeout|default(6000) }}