mr twitter #1


This connects to Hue and publishes a MR job to Hadoop to analyze tweets.

Map Reduce Sample Project

This is a set of projects for demonstrating and testing a Lambda architecture mixing Kafka, Hadoop, Spark, Redis and node.js by performing Twitter sentiment analysis.

This MR Job is for now just a dummy charm that doesn't do anything. The target is to make it more powerful overtime and actually do a MR on tweets to retrieve a Sentiment Analysis.


On the Canonical side, the maintainer will be Samuel Cozannet

Preferred deployment method

This project should be consumed via Juju through the deployment of a bundle.

The below instruction will however explain how to use it in a non automated environment

Hadoop Installation


Running the project

Starting all elements

  1. Start the Kafka Server (Broker)

1.1. Manually

See the related project for more information. To start it manually you can do:

:~# cd /opt/kafka
:~# ./bin/ /opt/kafka/config/

<That makes sure you have a broker ready to welcome the stream of data from Twitter.

The lines of log should end like:

root@kafka-0:/opt/kafka# ./bin/ /opt/kafka/config/
[2014-10-20 07:55:59,468] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2014-10-20 07:56:00,287] INFO Found clean shutdown file. Skipping recovery for all logs in data directory '/tmp/kafka-logs' (kafka.log.LogManager)
[2014-10-20 07:56:00,289] INFO Loading log '' (kafka.log.LogManager)
[2014-10-20 07:56:00,394] INFO Completed load of log with log end offset 369524 (kafka.log.Log)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
[2014-10-20 07:56:00,475] INFO Loading log '' (kafka.log.LogManager)
[2014-10-20 07:56:00,481] INFO Completed load of log with log end offset 647391 (kafka.log.Log)
[2014-10-20 07:56:00,483] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2014-10-20 07:56:00,489] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2014-10-20 07:56:00,543] INFO Awaiting socket connections on (
[2014-10-20 07:56:00,544] INFO [Socket Server on Broker 0], Started (
[2014-10-20 07:56:00,827] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-10-20 07:56:00,887] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-10-20 07:56:01,614] INFO Registered broker 0 at path /brokers/ids/0 with address ip-1XXXXXXXXXXXXXX-compute.internal:9092. (kafka.utils.ZkUtils$)
[2014-10-20 07:56:01,691] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-10-20 07:56:01,758] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2014-10-20 07:56:02,792] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [,0],[,1] (kafka.server.ReplicaFetcherManager)
[2014-10-20 07:56:03,255] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [,0],[,1] (kafka.server.ReplicaFetcherManager)

1.2. As a service

Assuming this was deployed with Juju, Kafka would be a daemon started automatically:

root@kafka-0:~# service kafka <start | stop | restart>

Note this require your Zookeeper cluster to be up & running.

  1. Start the Kafka Producer

2.1. Manually

In the Kafka language, Producer means your "data source collection hub". It's the primary Kafka node that connects to your raw data source, in our case a Twitter Streaming API feed.

For this we use an old version of which we refactored a little bit (see

See the related project for more information but assuming this was deployed with Juju on the same node as you Kafka Server, you can start it in command line with

:~# cd /opt/kafka-twitter
:~# ./gradlew run -Pargs="/opt/kafka-twitter/conf/producer.conf"

At some point you will then see a [75% - run] notification, and lines mentionning you are connected to Twitter.

The output should then be:

root@kafka-0:/opt/kafka-twitter# ./gradlew run -Pargs="/opt/kafka-twitter/conf/producer.conf"
:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
5854 [Twitter Stream consumer-1[initializing]] INFO twitter4j.TwitterStreamImpl - Establishing connection.
12963 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl - Connection established.
12963 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl - Receiving status stream.
> Building 75% > :run

2.2. As a service

In the latest version of the project this has been converted to a service which you can start with

root@kafka-0:~# service kafka-twitter <start | stop | restart>

This require a Kafka Broker to be running, but it doesn't have to be on the same node.

2.3 Testing

If you want to check if it really works, Kafka hosts a log of what it does in /tmp

ubuntu@kafka-0:~$ ls -la /tmp/kafka-logs/
total 2260880
drwxr-xr-x 2 root root      4096 Oct 16 14:50 .
drwxr-xr-x 4 root root      4096 Oct 20 08:01 ..
-rw-r--r-- 1 root root    734256 Oct 20 08:01 00000000000000000000.index
-rw-r--r-- 1 root root 536871014 Oct 15 15:00 00000000000000000000.log
-rw-r--r-- 1 root root    728016 Oct 20 08:01 00000000000000151393.index
-rw-r--r-- 1 root root 536871292 Oct 16 12:16 00000000000000151393.log
-rw-r--r-- 1 root root    732488 Oct 20 08:01 00000000000000302151.index
-rw-r--r-- 1 root root 536877126 Oct 16 14:00 00000000000000302151.log
-rw-r--r-- 1 root root    732600 Oct 20 08:01 00000000000000454244.index
-rw-r--r-- 1 root root 536874264 Oct 16 14:50 00000000000000454244.log
-rw-r--r-- 1 root root    224728 Oct 20 08:01 00000000000000605240.index
-rw-r--r-- 1 root root 164450494 Oct 20 08:01 00000000000000605240.log

As you can see, each log batch is 512MB then it gets rotated. However the old logs are kept so beware of the disk beast. You can change that in the Kafka Configuration. (see the kafka-twitter project)

  1. Start the MapReduce job and grep for the output

3.1. Manually


3.2 As a service


Editing the code


Troubleshooting / FAQ

Including Java version

At first run, Maven would not compile because of a Java versioning problem. This was fixed by adding


to the original pom.xml.

Include java-util

While doing some testing on the Java file, we had to use java-util. This code is not used anymore but we kept the dependency just in case in the pom.xml file:


Http Components Versioning

This may be the root cause of the NodeNotifier not working. The Java Httpcomponents library has evolved a lot since this project was written. We had to update the versioning of the pom.xml file:


HTTP Libraries not updated on Storm Workers

By default, Hortonworks Storm Workers load versions 4.1.1 of HTTP Components. Make sure you run the following commands from your computer (or locally, this is pretty straightforward)

:~$ juju run --service=storm-worker "wget -O /usr/lib/storm/lib/httpcore-4.3.2.jar"
:~$ juju run --service=storm-worker "wget -O /usr/lib/storm/lib/httpclient-4.3.2.jar"
:~$ juju run --service=storm-worker "rm -f /usr/lib/storm/lib/httpclient-4.1.1.jar /usr/lib/storm/lib/httpcore-4.1.jar"
:~$ juju run --service=nimbus-server "wget -O /usr/lib/storm/lib/httpcore-4.3.2.jar"
:~$ juju run --service=nimbus-server "wget -O /usr/lib/storm/lib/httpclient-4.3.2.jar"
:~$ juju run --service=nimbus-server "rm -f /usr/lib/storm/lib/httpclient-4.1.1.jar /usr/lib/storm/lib/httpcore-4.1"
:~$ juju run --service=storm-worker "supervisorctl stop all"
:~$ juju run --service=nimbus-server "supervisorctl restart all"
:~$ juju run --service=storm-worker "supervisorctl start all"

Then you'll have the right versions running. This has been reported as a bug in the charms and will be updated soon.

Topology failing after a few tweets

It happens that the default netty configuration loaded with Hortonworks Storm and Juju is very picky. The below lines shall be added to /etc/storm/conf/storm.yaml

storm.messaging.netty.max_retries: 300
storm.messaging.netty.max_wait_ms: 2000
storm.messaging.netty.min_wait_ms: 100

Failure after a few days

ZooKeeper nodes tend to fail after a few days if nothing is done. This is because ZK keeps logging information for ever and doesn't cleanup logs by default. This behavior can be changed by adding this to the crontab:

0 0 * * * /usr/lib/zookeeper/bin/ -n 3

Then restart the Cron service.

root@hdp-zookeper:~$# service cron restart

Failure to store on HDFS

The current status of the Charms to deploy Hortonworks Hadoop distribution prevent colocation of services, which is required for this example. This has been reported as a bug and shall be fixed soon.


The code in this project is made available as free and open source software
under the terms and conditions of the GNU Public License. For more information,
please refer to the LICENSE text file included with this project, or visit if the license file was not included.