An Alternative to the Twitter River - Index Tweets in Elasticsearch with Logstash

For some time now I've been using the Elasticsearch Twitter river for streaming conference tweets to Elasticsearch. The river runs on an Elasticsearch node, tracks the Twitter streaming API for keywords and directly indexes the documents in Elasticsearch. As the rivers are about to be deprecated it is time to move on to the recommended replacement: Logstash.

With Logstash the retrieval of the Twitter data is executed in a different process, probably even on a different machine. This helps in scaling Logstash and Elasticsearch seperately.

Installation

The installation of Logstash is nearly as easy as the one for Elasticsearch though you can't start it without a configuration that tells it what you want it to do. You can download it, unpack the archive and there are scripts to start it. If you are fine with using the embedded Elasticsearch instance you don't even need to install this separately. But you need to have a configuration file in place that tells Logstash what to do exactly.

Configuration

The configuration for Logstash normally consists of three sections: The input, optional filters and the output section. There is a multitude of existing components for each of those available. The structure of a config file looks like this (taken from the documentation):

# This is a comment. You should use comments to describe
# parts of your configuration.
input {
...
}

filter {
...
}

output {
...
}

We are using the Twitter input, the elasticsearch_http output and no filters.

Twitter

As with any Twitter API interaction you need to have an account and configure the access tokens.

input {
twitter {
# add your data
consumer_key => ""
consumer_secret => ""
oauth_token => ""
oauth_token_secret => ""
keywords => ["elasticsearch"]
full_tweet => true
}
}

You need to pass in all the credentials as well as the keywords to track. By enabling the full_tweet option you can index a lot more data, by default there are only a few fields and interesting information like hashtags or mentions are missing.

The Twitter river seems to have different names than the ones that are sent with the raw tweets so it doesn't seem to be possible to easily index Twitter logstash data along with data created by the Twitter river. But it should be no big deal to change the Logstash field names as well with a filter.

Elasticsearch

There are three plugins that are providing an output to Elasticsearch: elasticsearch, elasticsearch_http and elasticsearch_river. elasticsearch provides the opportunity to bind to an Elasticsearch cluster as a node or via transport, elasticsearch_http uses the HTTP API and elasticsearch_river communicates via the RabbitMQ river. The http version lets you use different Elasticsearch versions for Logstash and Elasticsearch, this is the one I am using. Note that the elasticsearch plugin also provides an option for setting the protocol to http that also seems to work.

output {
elasticsearch_http {
host => "localhost"
index => "conf"
index_type => "tweet"
}
}

In contrast to the Twitter river the Logstash plugin does not create a special mapping for the tweets. I didn't go through all the fields but for example the coordinates don't seem to be mapped correctly to geo_point and some fields are analyzed that probably shouldn't be (urls, usernames). If you are using those you might want to prepare your index by supplying it with a custom mapping.

By default tweets will be pushed to Elasticsearch every second which should be enough for any analysis. You can even think about reducing this with the property idle_flush_time.

Running

Finally, when all of the configuration is in place you can execute Logstash using the following command (assuming the configuration is in a file twitter.conf):

bin/logstash agent -f twitter.conf

Nothing left to do but wait for the first tweets to arrive in your local instance at http://localhost:9200/conf/tweet/_search?q=*:*&pretty=true.

For the future it would be really useful to prepare a mapping for the fields and a filter that removes some of the unused data. For now you have to check what you would like to use of the data and prepare a mapping in advance.