Wednesday, April 1, 2015

Playing with Flume



Once thing is certain reading all materials and taking the courses does not prepare you for using the Cloudera Manager to configure and maintain your cluster.

Case in point, using flume.

I’ve been trying to understanding how to configure and run multiple flume agents in Cloudera using the manager.

It was pretty easy to setup one agent.  I just had to change the configuration on the default teir1 agent to the settings I created and restart. But the documentation I was reading was not clear.

I replaced teir1 with my a1 configuration of a source spoorDir using a memory channel and it worked almost as intended.  I do have some issues I need to work and I’m playing with the rollover size and interval as my small 250K test file was importing into many 10 row flume import files.

So I adjusted the settings and dropped in a few more files all around 250K maybe 2MB in total to generate a larger file in HDFS. But after running yet after resting the cluster I was surprised to see a 48MB file.  So what was happening in the background to generate such a large file from a few small data sources.  Next step at a later date to find out why……

a1.sources = ExternalFileScrDir
a1.channels = memoryChannel
a1.sinks = flumeHDFS

# Setting the source to spool directory where the file exists
a1.sources.ExternalFileScrDir.type = spooldir
a1.sources.ExternalFileScrDir.spoolDir = /usr/local/flume/live
a1.sources.ExternalFileScrDir.deletePolicy = immediate

# Setting the channel to memory
a1.channels.memoryChannel.type = memory
# Max number of events stored in the memory channel
a1.channels.memoryChannel.capacity = 1000
a1.channels.memoryChannel.batchSize = 250
a1.channels.memoryChannel.transactionCapacity = 500

# Setting the sink to HDFS
a1.sinks.flumeHDFS.type = hdfs
a1.sinks.flumeHDFS.hdfs.path = hdfs://quickstart.cloudera/user/hive/warehouse/flumeimport
a1.sinks.flumeHDFS.hdfs.fileType = DataStream

# Write format can be text or writable
a1.sinks.flumeHDFS.hdfs.writeFormat = Text

#a1.sinks.flumeHDFS.hdfs.rollCount = 0
#a1.sinks.flumeHDFS.hdfs.rollInterval = 0
#a1.sinks.flumeHDFS.hdfs.rollSize = 0
a1.sinks.flumeHDFS.hdfs.batchSize = 1000

# use a single csv file at a time
a1.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

# Connect source and sink with channel
a1.sources.ExternalFileScrDir.channels = memoryChannel
a1.sinks.flumeHDFS.channel = memoryChannel


Then I assumed I could just append another configuration “a2” of another spoolDir using a file type channel and both would be active and running waiting for files… But that’s not what happened only the initial agent was working as the second agent had its own name.

After some searching it hit me… and my initial config below worked… two agents one memory type one file type both waiting for files.

# Initialize agent's source, channel and sink
a1.sources = ExternalFileScrDir ExternalFileScrDir1
a1.channels = memoryChannel fileChannel
a1.sinks = flumeHDFS flumeHDFS1

# Setting the source to spool directory where the file exists
a1.sources.ExternalFileScrDir.type = spooldir
a1.sources.ExternalFileScrDir.spoolDir = /usr/local/flume/live
#a1.sources.ExternalFileScrDir.deletePolicy = immediate
a1.sources.ExternalFileScrDir1.type = spooldir
a1.sources.ExternalFileScrDir1.spoolDir = /usr/local/flume/files


# Setting the channel to memory
a1.channels.memoryChannel.type = memory

a1.channels.memoryChannel.capacity = 10000
a1.channels.memoryChannel.batchSize = 250
a1.channels.memoryChannel.transactionCapacity = 5000
a1.channels.memoryChannel.checkpointInterval = 3000
a1.channels.memoryChannel.maxFileSize = 5242880

# Setting the channel to memory
a1.channels.fileChannel.type = file
a1.channels.fileChannel.capacity = 10000
a1.channels.fileChannel.batchSize = 250
a1.channels.fileChannel.transactionCapacity = 5000
a1.channels.fileChannel.checkpointInterval = 3000
a1.channels.fileChannel.maxFileSize = 5242880

# Setting the sink to HDFS
a1.sinks.flumeHDFS.type = hdfs
a1.sinks.flumeHDFS.hdfs.path = hdfs://quickstart.cloudera/user/hive/warehouse/flumeimport
a1.sinks.flumeHDFS.hdfs.fileType = DataStream

a1.sinks.flumeHDFS1.type = hdfs
a1.sinks.flumeHDFS1.hdfs.path = hdfs://quickstart.cloudera/user/hive/warehouse/flumeimport
a1.sinks.flumeHDFS1.hdfs.fileType = DataStream

# Write format can be text or writable
a1.sinks.flumeHDFS.hdfs.writeFormat = Text
a1.sinks.flumeHDFS1.hdfs.writeFormat = Text

# rollover file based on max time of 1 min
#a1.sinks.flumeHDFS.hdfs.rollInterval = 0
#a1.sinks.flumeHDFS.hdfs.idleTimeout = 600

a1.sinks.flumeHDFS.hdfs.rollCount = 0
a1.sinks.flumeHDFS.hdfs.rollInterval = 0
a1.sinks.flumeHDFS.hdfs.rollSize = 0
a1.sinks.flumeHDFS.hdfs.batchSize = 1000

# use a single csv file at a time
a1.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

# Connect source and sink with channel
a1.sources.ExternalFileScrDir.channels = memoryChannel
a1.sinks.flumeHDFS.channel = memoryChannel
a1.sources.ExternalFileScrDir1.channels = fileChannel
a1.sinks.flumeHDFS1.channel = fileChannel



I’m still perfecting the file sizes and debugging the channel full issues but some of this may be attributed to my tiny VM instance and low resources on my laptop.

No comments:

Post a Comment