public class KafkaInputFormat
extends org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable>
implements org.apache.hadoop.conf.Configurable
BytesWritable
instance.
Populating the configuration of the input format is handled with the convenience method of
writeOffsetsToConfiguration(Map, Configuration)
. This should be done to ensure
the Kafka offset information is available when the input format creates its splits
and readers
.
To suppress warnings generated by unused configs in the ConsumerConfig
,
one can use tagExistingKafkaConnectionProperties
and
generateConnectionPropertyKey
to prefix Kafka connection properties with
"org.apache.crunch.kafka.connection.properties" to allow for retrieval later using getConnectionPropertyFromKey
and filterConnectionProperties
.Constructor and Description |
---|
KafkaInputFormat() |
Modifier and Type | Method and Description |
---|---|
org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable> |
createRecordReader(org.apache.hadoop.mapreduce.InputSplit inputSplit,
org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext) |
static Properties |
filterConnectionProperties(Properties props)
Filters out Kafka connection properties that were tagged using
generateConnectionPropertyKey . |
org.apache.hadoop.conf.Configuration |
getConf() |
static Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> |
getOffsets(org.apache.hadoop.conf.Configuration configuration)
Reads the
configuration to determine which topics, partitions, and offsets should be used for reading data. |
List<org.apache.hadoop.mapreduce.InputSplit> |
getSplits(org.apache.hadoop.mapreduce.JobContext jobContext) |
void |
setConf(org.apache.hadoop.conf.Configuration configuration) |
static Properties |
tagExistingKafkaConnectionProperties(Properties connectionProperties)
Generates a
Properties object containing the properties in connectionProperties , but with every
property prefixed with "org.apache.crunch.kafka.connection.properties". |
static void |
writeConnectionPropertiesToBundle(Properties connectionProperties,
FormatBundle bundle)
Writes the Kafka connection properties to the
bundle . |
static void |
writeOffsetsToBundle(Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> offsets,
FormatBundle bundle)
Writes the start and end offsets for the provided topic partitions to the
bundle . |
static void |
writeOffsetsToConfiguration(Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> offsets,
org.apache.hadoop.conf.Configuration config)
Writes the start and end offsets for the provided topic partitions to the
config . |
public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException, InterruptedException
getSplits
in class org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable>
IOException
InterruptedException
public org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable> createRecordReader(org.apache.hadoop.mapreduce.InputSplit inputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
createRecordReader
in class org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable>
IOException
InterruptedException
public void setConf(org.apache.hadoop.conf.Configuration configuration)
setConf
in interface org.apache.hadoop.conf.Configurable
public org.apache.hadoop.conf.Configuration getConf()
getConf
in interface org.apache.hadoop.conf.Configurable
public static void writeOffsetsToBundle(Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> offsets, FormatBundle bundle)
bundle
.offsets
- The starting and ending offsets for the topics and partitions.bundle
- the bundle into which the information should be persisted.public static void writeOffsetsToConfiguration(Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> offsets, org.apache.hadoop.conf.Configuration config)
config
.offsets
- The starting and ending offsets for the topics and partitions.config
- the config into which the information should be persisted.public static Map<org.apache.kafka.common.TopicPartition,Pair<Long,Long>> getOffsets(org.apache.hadoop.conf.Configuration configuration)
configuration
to determine which topics, partitions, and offsets should be used for reading data.configuration
- the configuration to derive the data to read.TopicPartition
to a pair of start and end offsets.IllegalStateException
- if the configuration
does not have the start and end offsets set properly
for a partition.public static void writeConnectionPropertiesToBundle(Properties connectionProperties, FormatBundle bundle)
bundle
.connectionProperties
- the Kafka connection propertiesbundle
- the bundle into which the information should be persisted.public static Properties tagExistingKafkaConnectionProperties(Properties connectionProperties)
Properties
object containing the properties in connectionProperties
, but with every
property prefixed with "org.apache.crunch.kafka.connection.properties".connectionProperties
- the properties to be prefixed with "org.apache.crunch.kafka.connection.properties"Properties
object representing Kafka connection propertiespublic static Properties filterConnectionProperties(Properties props)
generateConnectionPropertyKey
.props
- the properties to be filtered.generateConnectionPropertyKey(String)
.Copyright © 2017 The Apache Software Foundation. All rights reserved.