...making Linux just a little more fun!

Logging to a Database with Rsyslog

By Ron Peterson

Introduction

RSyslog extends and improves on the venerable syslogd service. It supports the standard configuration syntax of its predecessor, but offers a number of more advanced features. For example, you can construct advanced filtering expressions in addition to the simple and limiting facility.priority selectors. In addition to the usual log targets, you can also write to a number of different databases. In this article, I'm going to show you how to combine these features to capture specific information to a database. In addition, I'll show you how to use trigger functions to parse the log messages into a more structured format.

Prerequisites

Obviously you'll need to have rsyslog installed. My examples will be constructed using the packaged version of rsyslog available in the current Debian Stable (Lenny). You'll also need the plugin module for writing to PostgreSQL.

1122# dpkg -l "*rsyslog*" | grep ^i
ii  rsyslog        3.18.6-4  enhanced multi-threaded syslogd
ii  rsyslog-pgsql  3.18.6-4  PostgreSQL output plugin for rsyslog

I'll be using PostgreSQL version 8.4.2, built with plperl support. I'm using plperl to write my trigger functions, to take advantage of Perl's string handling.

I'm not going to go into any detail about how to get these tools installed and running, as there are any number of good resources (see below) already available to help with that.

RSyslog Configuration

My distribution puts the RSyslog configuration files in two places. It all starts with /etc/rsyslog.conf. Near the top of this file, there is a line like this, which pulls additional config files out of the rsyslog.d directory:

$IncludeConfig /etc/rsyslog.d/*.conf

I'm going to put my custom RSyslog configuration in a file called /etc/rsyslog.d/lg.conf. We're going to use this file to do several things:

Let's start with something easy - loading the database module.

$ModLoad ompgsql.so

That wasn't too bad now, was it? :)

So let's move on. The next thing we'll do is configure RSyslog to buffer the output headed for the database. There's a good section about why and how to do this in the RSyslog documentation. The spooling mechanism we're configuring here allows RSyslog to queue database writes during peak activity. I'm also including a commented line that shows you something you should not do.

$WorkDirectory /var/tmp/rsyslog/work

# This would queue _ALL_ rsyslog messages, i.e. slow them down to rate of DB ingest.
# Don't do that...
# $MainMsgQueueFileName mainq  # set file name, also enables disk mode

# We only want to queue for database writes.
$ActionQueueType LinkedList # use asynchronous processing
$ActionQueueFileName dbq    # set file name, also enables disk mode
$ActionResumeRetryCount -1   # infinite retries on insert failure

Now we're going to define an RSyslog template that we'll refer to in just a minute when we write our filter expression. This template describes how we're going to stuff log data into a database table. The %percent% quoted strings are RSyslog 'properties'. A list of the properties you might use can be found in the fine documentation. You'll see why I call this 'unparsed' in just a moment.

$template mhcUnparsedSSH, \
          "INSERT INTO unparsed_ssh \
          ( timegenerated, \
            hostname, \
            tag, \
            message ) \
          VALUES \
          ( '%timegenerated:::date-rfc3339%', \
            '%hostname%', \
            '%syslogtag%', \
            '%msg%' );", \
          stdsql

All that's left to do on the RSyslog side of things is to find some interesting data to log. I'm going to log SSH login sessions.

if $syslogtag contains 'sshd' and $hostname == 'ahost' then :ompgsql:localhost,rsyslog,rsyslog,topsecret;mhcUnparsedSSH

This all needs to be on one line. It probably helps at this juncture to look at some actual log data as it might appear in a typical log file.

Feb  2 10:51:24 ahost imapd[16376]: Logout user=ausername host=someplace.my.domain [10.1.56.8]
Feb  2 10:51:24 ahost sshd[17093]: Failed password for busername from 10.1.34.3 port 43576 ssh2
Feb  2 10:51:27 ahost imapds[17213]: Login user=cusername host=anotherplace.outside.domain [256.23.1.34]
Feb  2 10:51:27 ahost imapds[17146]: Killed (lost mailbox lock) user=dusername host=another.outside.domain [256.234.34.1]
Feb  2 10:51:27 ahost sshd[17093]: Accepted password for busername from 10.1.34.3 port 43576 ssh2

This is real log data, modified to protect the innocent, from /var/log/auth.log on one of my servers. In a standard syslog setup, this data would be captured with a configuration entry for the 'auth' facility. As you can see, it contains authorization information for both IMAP and SSH sessions. For my current purposes, I only care about SSH sessions. In a standard syslog setup, teasing this information apart can be a real pain, because you only have so many facility. selectors to work with. With RSyslog, you can write advanced filtering expressions to help you capture just what you want.

In my case, I want to grab all log entries where the syslog tag contains 'sshd' which originate from host 'ahost'. The 'then' portion of my expression says what to do with the data, namely, to use the ompgsql driver and the mhcUnparsedSSH template to stuff it into the 'rsyslog' database found at 'localhost' as user 'rsyslog', password 'topsecret'.

The interesting information about these connections exists in the message section, i.e. - the part specified by %msg% in my template. This corresponds to all of the text after the syslog tag's colon:

Failed password for busername from 10.1.34.3 port 43576 ssh2
Accepted password for busername from 10.1.34.3 port 43576 ssh2

We have our data in a database at this point. We could just stop where we are. I want to take this a little farther, though. I want to break the message text down into the parts and pieces I care about, and put it into a more structured table. So let's turn to the database side of things to see what we can do.

All Hail PostgreSQL

I'm going to create a database with two tables. The first table corresponds to the table we're referring to with our RSyslog template. The second table will be a little more structured. We will then write a trigger function for the first table. When a new row is added to our first table, this trigger function will parse the data, tease it apart, and put the constituent pieces into our second table. Our tables will look like this:

CREATE TABLE unparsed_ssh (
  timegenerated
    TIMESTAMP WITH TIME ZONE,
  hostname
    VARCHAR(12),
  tag
    TEXT,
  message
    TEXT
);

CREATE TABLE authlog (
  timegenerated
    TIMESTAMP WITH TIME ZONE,
  application_host
    VARCHAR(12),
  tag
    VARCHAR(24),
  application
    VARCHAR(24),
  origin_ip
    INET,
  username
    VARCHAR(24),
  oper
    VARCHAR(6)
    CHECK( oper IN ('login', 'logout') ),
  success
    BOOL
);

You might imagine other fields that would be interesting to have in the authlog table, but that starts to get off point.

I'm going to lay the trigger function on you all at once. I'm using plperl, because as I previously mentioned, perl makes short work of string manipulation. There is plenty of information about plperl in the PostgreSQL docs. A few words to help you get your bearings. The $_TD->{new} variables refer to the trigger data made available to the function when it is called. You can see that these variables refer to the columns of the table that RSyslog is stuffing data into. The rest of the function simply pulls the message text apart, and then constructs an INSERT sql statement which pushes our parsed data into our second table.

CREATE OR REPLACE FUNCTION
  parse_ssh_log()
RETURNS TRIGGER AS
$$
  my $timegenerated = $_TD->{new}{timegenerated};
  my $hostname = $_TD->{new}{hostname};
  my $tag = $_TD->{new}{tag};
  my $message = $_TD->{new}{message};

  my $query;
  my $rv;
  my $user;
  my $ip;
  my $method;

  if( $message =~ /.*?(Failed|Accepted).*?for (.*?) from (.*?) .*/ ) {
    $success = $1;
    $user = $2;
    $ip = $3;
    if( $success eq "Failed" ) {
      $success = "false";
    } else {
      $success = "true";
    }
    $query = <<EOSQL;
INSERT INTO authlog ( timegenerated,
                      application_host,
                      tag,
                      application,
                      origin_ip,
                      username,
                      oper,
                      success )
VALUES ( '$timegenerated',
         '$hostname',
         '$tag',
         'ssh',
         '$ip',
         '$user',
         'login',
         '$success' );
EOSQL
    $rv = spi_exec_query( $query );
    return "SKIP";
  }
  return;
$$ LANGUAGE 'plperl';

CREATE TRIGGER parse_ssh_trigger
BEFORE INSERT ON unparsed_ssh
FOR EACH ROW EXECUTE PROCEDURE parse_ssh_log();

Note that I have two different types of 'return' statement. The 'return "SKIP"' tells the trigger to throw away the original row. In other words, our ssh log entries never actually land in the first table at all. That table essentially only exists as a placeholder for our trigger function. The final return is only called if our regular expression fails to match. Since it does not SKIP the insertion, any row our function doesn't match will end up in our first table. This is a good way to check that you are capturing what you think you are.


Conclusion

Why do this? Here's a real world example. In my work, it's good to be alerted about things like multiple login failures. If the same username is failing to log in from multiple different IP addresses, for example, that usually indicates someone is attempting to break into their account. If someone is successfully logging in from multiple widely separated IP addresses, is often means they have already been broken into. Unfortunately, we have multiple authentication systems, which makes it difficult to watch all of this activity. Using RSyslog, I'm pulling log data from multiple different systems - ssh, imap, ldap, etc. - into a single structured database. This enhances our forensic capabilities.

Of course, it's easy enough to simply push a bunch of syslog data to a central server to consolidate the information in a central location. That's what I'm doing here also, but rather than simply writing the log data to a file, I'm using a database. Often, 'grep' and friends are all you need. But a database lets you easily do more sophisticated queries. How would you grep a specific time interval, for example? Here's an simple example query, written as a shell script:

#! /bin/bash
export PGPASSWORD='topsecret';

psql -U rsyslog -d rsyslog -h localhost <<EOF
SELECT timegenerated, username, origin_ip, application_host,
application
FROM authlog
WHERE username = '$1' AND
      timegenerated <= CURRENT_TIMESTAMP - '24 hours'::INTERVAL
ORDER BY timegenerated DESC;
EOF

This is really the whole point of this exercise: being able to use simple SQL statements to make it easier to do more advanced reporting.

I hope you found this article helpful. Happy hacking.


Resources


Talkback: Discuss this article with The Answer Gang


Bio picture

Ron Peterson is a Network & Systems Manager at Mount Holyoke College in the happy hills of western Massachusetts. He enjoys lecturing his three small children about the maleficent influence of proprietary media codecs while they watch Homestar Runner cartoons together.


Copyright © 2010, Ron Peterson. Released under the Open Publication License unless otherwise noted in the body of the article. Linux Gazette is not produced, sponsored, or endorsed by its prior host, SSC, Inc.

Published in Issue 172 of Linux Gazette, March 2010

Tux