diff --git a/roles/kafka/defaults/main.yml b/roles/kafka/defaults/main.yml index 3c912dc..e09383c 100644 --- a/roles/kafka/defaults/main.yml +++ b/roles/kafka/defaults/main.yml @@ -1,7 +1,7 @@ --- kafka_scala_version: 2.11 -kafka_version: 0.9.0.1 +kafka_version: 0.10.2.1 kafka_install_dir: /opt/kafka kafka_config_dir: /opt/kafka/default/config @@ -19,15 +19,14 @@ kafka_group: kafka kafka_port: 9092 kafka_jmx_port: 9999 kafka_message_max: 10000000 -kafka_replica_fetch_max_bytes: 15000000 kafka_consumer_message_max: 16777216 -kafka_num_partitions: "{{ groups['kafka-nodes'] | length }}" -kafka_replication_factor: "{{ groups['kafka-nodes'] | length }}" +kafka_num_partitions: 3 +kafka_replication_factor: 3 kafka_log_retention_hours: 168 -kafka_offsets_topic_num_partitions: 50 -kafka_offsets_topic_replication_factor: 3 -kafka_offsets_topic_segmant_bytes: 104857600 +kafka_num_network_threads: 8 kafka_num_io_threads: 8 +kafka_num_replica_fetchers: 6 +kafka_zookeeper_connection_timeout: 6000 kafka_heap: 1G repository_infrastructure: "{{ apache_mirror }}/kafka/{{ kafka_version }}" diff --git a/roles/kafka/handlers/main.yml b/roles/kafka/handlers/main.yml index 7cacfc8..0116337 100644 --- a/roles/kafka/handlers/main.yml +++ b/roles/kafka/handlers/main.yml @@ -1,6 +1,3 @@ --- - name: restart kafka command: supervisorctl update kafka - # supervisorctl: - # name=kafka - # state=restarted diff --git a/roles/kafka/tasks/main.yml b/roles/kafka/tasks/main.yml index 8a4a15d..8161257 100644 --- a/roles/kafka/tasks/main.yml +++ b/roles/kafka/tasks/main.yml @@ -83,8 +83,6 @@ src=consumer.properties.j2 dest={{ kafka_config_dir }}/consumer.properties mode=0644 - notify: - - restart kafka tags: kafka - name: copy supervisord config @@ -114,5 +112,3 @@ - "{{ kafka_data_dir }}" - "{{ kafka_log_dir }}" tags: kafka - -# - cron: name="clear old kafka app logs" job="find /opt/kafka/default/logs -mtime +7 -exec rm {} \; > /dev/null" minute="0" diff --git a/roles/kafka/templates/consumer.properties.j2 b/roles/kafka/templates/consumer.properties.j2 index 10b7bf4..666a128 100644 --- a/roles/kafka/templates/consumer.properties.j2 +++ b/roles/kafka/templates/consumer.properties.j2 @@ -28,5 +28,5 @@ group.id=test-consumer-group #consumer timeout #consumer.timeout.ms=5000 -# Need to increase this to play nice with message.max.bytes = 10000000 +# Need to increase this to play nice with broker message.max.bytes fetch.message.max.bytes={{ kafka_consumer_message_max }} diff --git a/roles/kafka/templates/log4j.properties.j2 b/roles/kafka/templates/log4j.properties.j2 index 3e12a1e..f97a46e 100644 --- a/roles/kafka/templates/log4j.properties.j2 +++ b/roles/kafka/templates/log4j.properties.j2 @@ -13,46 +13,59 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=INFO, stdout +log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n -log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.kafkaAppender.File={{ kafka_log_dir }}/server.log log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.kafkaAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }} log4j.appender.kafkaAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }} -log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.stateChangeAppender.File={{ kafka_log_dir }}/state-change.log log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.stateChangeAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }} log4j.appender.stateChangeAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }} -log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.requestAppender.File={{ kafka_log_dir }}/kafka-request.log log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.requestAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }} log4j.appender.requestAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }} -log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.cleanerAppender.File={{ kafka_log_dir }}/log-cleaner.log log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.cleanerAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }} log4j.appender.cleanerAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }} -log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.controllerAppender.File={{ kafka_log_dir }}/controller.log log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.controllerAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }} -log4j.appender.controllerAppender.MaxFileSize=20MB +log4j.appender.controllerAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }} + +log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.authorizerAppender.File={{ kafka_log_dir }}/kafka-authorizer.log +log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.appender.authorizerAppender.MaxBackupIndex={{ kafka_log_rolling_retention_count|default(5) }} +log4j.appender.authorizerAppender.MaxFileSize={{ kafka_log_rolling_max_size|default('10MB') }} # Turn on all our debugging info #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender @@ -60,7 +73,6 @@ log4j.appender.controllerAppender.MaxFileSize=20MB #log4j.logger.kafka.perf=DEBUG, kafkaAppender #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG - log4j.logger.kafka=INFO, kafkaAppender log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender @@ -80,3 +92,7 @@ log4j.additivity.kafka.log.LogCleaner=false log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=false + +#Change this to debug to get the actual audit log for authorizer. +log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender +log4j.additivity.kafka.authorizer.logger=false diff --git a/roles/kafka/templates/server.properties.j2 b/roles/kafka/templates/server.properties.j2 index f991202..be89f30 100644 --- a/roles/kafka/templates/server.properties.j2 +++ b/roles/kafka/templates/server.properties.j2 @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# @@ -21,47 +22,61 @@ {%- if host == inventory_hostname -%}broker.id={{ loop.index }}{%- endif -%} {% endfor %} -# The maximum message size the broker can receive (10mb as of 8/28/14) -message.max.bytes={{ kafka_message_max }} -replica.fetch.max.bytes={{ kafka_replica_fetch_max_bytes }} - -# Can delete topics in Kafka 0.8.2.0 +{% if kafka_inter_broker_protocol_version is defined and kafka_inter_broker_protocol_version %} +# Specify which version of the inter-broker protocol will be used. This is +# typically bumped after all brokers were upgraded to a new version. Example +# of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, +# 0.9.0.0, 0.9.0.1, 0.10.1. Check ApiVersion for the full list. +inter.broker.protocol.version={{ kafka_inter_broker_protocol_version }} +{% endif %} + +{% if kafka_log_message_format_version is defined and kafka_log_message_format_version %} +# Specify the message format version the broker will use to append messages to the +# logs. The value should be a valid ApiVersion. Some examples are: 0.8.2, 0.9.0.0, +# 0.10.0, 0.10.1, check ApiVersion for more details. By setting a particular message format +# version, the user is certifying that all the existing messages on disk are smaller +# or equal than the specified version. Setting this value incorrectly will cause +# consumers with older versions to break as they will receive messages with a format +# that they don't understand. +log.message.format.version={{ kafka_log_message_format_version }} +{% endif %} + +# The maximum message size the broker can receive +message.max.bytes={{ kafka_message_max|default(1000012) }} + +# Switch to enable topic deletion or not, default value is false delete.topic.enable=true ############################# Socket Server Settings ############################# -# The port the socket server listens on -port={{ kafka_port }} - -# Hostname the broker will bind to. If not set, the server will bind to all interfaces -# host.name={{ inventory_hostname }} +# The address the socket server listens on. It will get the value returned from +# java.net.InetAddress.getCanonicalHostName() if not configured. +# FORMAT: +# listeners = security_protocol://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +#listeners=PLAINTEXT://{{ inventory_hostname }}:{{ kafka_port|default(9092) }} -# Hostname the broker will advertise to producers and consumers. If not set, it uses the -# value for "host.name" if configured. Otherwise, it will use the value returned from -# java.net.InetAddress.getCanonicalHostName(). -advertised.host.name={{ inventory_hostname }} - -# The port to publish to ZooKeeper for clients to use. If this is not set, -# it will publish the same port that the broker binds to. -advertised.port={{ kafka_port }} +# Hostname and port the broker will advertise to producers and consumers. If not set, +# it uses the value for "listeners" if configured. Otherwise, it will use the value +# returned from java.net.InetAddress.getCanonicalHostName(). +advertised.listeners=PLAINTEXT://{{ inventory_hostname }}:{{ kafka_port|default(9092) }} # The number of threads handling network requests -# Apache Docs recommend setting to this 8 in production -num.network.threads=8 +num.network.threads={{ kafka_num_network_threads|default(3) }} # The number of threads doing disk I/O -num.io.threads={{ kafka_num_io_threads }} +num.io.threads={{ kafka_num_io_threads|default(8) }} # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer.bytes=1048576 +socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer.bytes=1048576 +socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 - ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files @@ -70,13 +85,18 @@ log.dirs={% for dir in kafka_data_log_dir %}{{dir}},{% endfor %} # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. -num.partitions={{ kafka_num_partitions }} +num.partitions={{ kafka_num_partitions|default(1) }} -# Add redundancy across brokers -default.replication.factor={{ kafka_replication_factor }} +# Default replication factors for automatically created topics +default.replication.factor={{ kafka_replication_factor|default(1) }} -# Increase IO between replication brokers -num.replica.fetchers=4 +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +# Number of fetcher threads used to replicate messages from a source broker. Increasing this +# value can increase the degree of I/O parallelism in the follower broker. +num.replica.fetchers={{ kafka_num_replica_fetchers|default(1) }} ############################# Log Flush Policy ############################# @@ -103,35 +123,18 @@ num.replica.fetchers=4 # from the end of the log. # The minimum age of a log file to be eligible for deletion -log.retention.hours={{ kafka_log_retention_hours }} +log.retention.hours={{ kafka_log_retention_hours|default(168) }} # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes=536870912 +log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies -log.retention.check.interval.ms=60000 - -# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. -# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. -# Always keep enabled now: https://issues.apache.org/jira/browse/KAFKA-3000 -log.cleaner.enable=true - -# The number of partitions for the offset commit topic (should not change after deployment) -offsets.topic.num.partitions={{ kafka_offsets_topic_num_partitions|default(50) }} - -# The replication factor for the offsets topic (set higher to ensure availability). To ensure that the effective replication -# factor of the offsets topic is the configured value, the number of alive brokers has to be at least the replication factor -# at the time of the first request for the offsets topic. If not, either the offsets topic creation will fail or it will get -# a replication factor of min(alive brokers, configured replication factor) -offsets.topic.replication.factor={{ kafka_offsets_topic_replication_factor|default(3) }} - -# The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads -offsets.topic.segment.bytes={{ kafka_offsets_topic_segmant_bytes|default(104857600) }} +log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# @@ -143,4 +146,4 @@ offsets.topic.segment.bytes={{ kafka_offsets_topic_segmant_bytes|default(1048576 zookeeper.connect={% for host in zookeeper_host_list %}{{ host }}:{{ zookeeper_client_port|default(2181) }}{% if not loop.last %},{% endif %}{% endfor %} # Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=1000000 +zookeeper.connection.timeout.ms={{ kafka_zookeeper_connection_timeout|default(6000) }}