public class KafkaInputFormat
extends org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable>
implements org.apache.hadoop.conf.Configurable
BytesWritable instance.
 Populating the configuration of the input format is handled with the convenience method of
 writeOffsetsToConfiguration(Map, Configuration).  This should be done to ensure
 the Kafka offset information is available when the input format creates its splits
 and readers.
 To suppress warnings generated by unused configs in the ConsumerConfig,
 one can use tagExistingKafkaConnectionProperties and
 generateConnectionPropertyKey to prefix Kafka connection properties with
 "org.apache.crunch.kafka.connection.properties" to allow for retrieval later using getConnectionPropertyFromKey and filterConnectionProperties.| Constructor and Description | 
|---|
| KafkaInputFormat() | 
| Modifier and Type | Method and Description | 
|---|---|
| org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable> | createRecordReader(org.apache.hadoop.mapreduce.InputSplit inputSplit,
                  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext) | 
| static Properties | filterConnectionProperties(Properties props)Filters out Kafka connection properties that were tagged using  generateConnectionPropertyKey. | 
| org.apache.hadoop.conf.Configuration | getConf() | 
| static Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> | getOffsets(org.apache.hadoop.conf.Configuration configuration)Reads the  configurationto determine which topics, partitions, and offsets should be used for reading data. | 
| List<org.apache.hadoop.mapreduce.InputSplit> | getSplits(org.apache.hadoop.mapreduce.JobContext jobContext) | 
| void | setConf(org.apache.hadoop.conf.Configuration configuration) | 
| static Properties | tagExistingKafkaConnectionProperties(Properties connectionProperties)Generates a  Propertiesobject containing the properties inconnectionProperties, but with every
 property prefixed with "org.apache.crunch.kafka.connection.properties". | 
| static void | writeConnectionPropertiesToBundle(Properties connectionProperties,
                                 FormatBundle bundle)Writes the Kafka connection properties to the  bundle. | 
| static void | writeOffsetsToBundle(Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> offsets,
                    FormatBundle bundle)Writes the start and end offsets for the provided topic partitions to the  bundle. | 
| static void | writeOffsetsToConfiguration(Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> offsets,
                           org.apache.hadoop.conf.Configuration config)Writes the start and end offsets for the provided topic partitions to the  config. | 
public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException, InterruptedException
getSplits in class org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable>IOExceptionInterruptedExceptionpublic org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable> createRecordReader(org.apache.hadoop.mapreduce.InputSplit inputSplit,
                                                                                                                                          org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext)
                                                                                                                                   throws IOException,
                                                                                                                                          InterruptedException
createRecordReader in class org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable>IOExceptionInterruptedExceptionpublic void setConf(org.apache.hadoop.conf.Configuration configuration)
setConf in interface org.apache.hadoop.conf.Configurablepublic org.apache.hadoop.conf.Configuration getConf()
getConf in interface org.apache.hadoop.conf.Configurablepublic static void writeOffsetsToBundle(Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> offsets, FormatBundle bundle)
bundle.offsets - The starting and ending offsets for the topics and partitions.bundle - the bundle into which the information should be persisted.public static void writeOffsetsToConfiguration(Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> offsets, org.apache.hadoop.conf.Configuration config)
config.offsets - The starting and ending offsets for the topics and partitions.config - the config into which the information should be persisted.public static Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> getOffsets(org.apache.hadoop.conf.Configuration configuration)
configuration to determine which topics, partitions, and offsets should be used for reading data.configuration - the configuration to derive the data to read.TopicPartition to a pair of start and end offsets.IllegalStateException - if the configuration does not have the start and end offsets set properly
 for a partition.public static void writeConnectionPropertiesToBundle(Properties connectionProperties, FormatBundle bundle)
bundle.connectionProperties - the Kafka connection propertiesbundle - the bundle into which the information should be persisted.public static Properties tagExistingKafkaConnectionProperties(Properties connectionProperties)
Properties object containing the properties in connectionProperties, but with every
 property prefixed with "org.apache.crunch.kafka.connection.properties".connectionProperties - the properties to be prefixed with "org.apache.crunch.kafka.connection.properties"Properties object representing Kafka connection propertiespublic static Properties filterConnectionProperties(Properties props)
generateConnectionPropertyKey.props - the properties to be filtered.generateConnectionPropertyKey(String).Copyright © 2017 The Apache Software Foundation. All rights reserved.