Revision $RecipeId: lsof-socket-csv2dbi.base,v 1.3 2005/06/14 19:11:30 klm Exp $ Purpose Often, it is useful to load information into a relational database so that a wide variety of analysis queries may be implemented once and run multiple times. This recipe demonstrates how to prepare and load lsof data into a MySQL database. In particular, the focus will be on TCP/UDP socket information collected with the following command: $ lsof -Di -n -P -itcp -iudp -FLucRptPTn > lsof.out Further, the output collected by this command must be preprocessed using lsof-socket-out2csv.pl as follows: $ lsof-socket-out2csv.pl -h -f lsof.out > lsof.out.csv The lsof-socket-out2csv.pl script is part of "Harvest lsof socket data (TCP/UDP)" which is available here: http://webjob.sourceforge.net/Files/Recipes/webjob-run-lsof-socket.txt Motivation Monitoring lsof socket information regularly promotes awareness and better security practices. Having a centrally managed repository of lsof socket information may help mitigate system anomalies more quickly because admins and decision makers will be able to identify daemons and sockets that don't conform to a given profile or policy. Requirements This recipe assumes that you are familiar with lsof, Perl, and MySQL and that you already have a running MySQL database. Solution The solution is to process lsof.out.csv with lsof_socket_csv2dbi and use the resulting .dbi and .sql files to import the data into MySQL. The following steps describe how to implement this solution. 1. Extract lsof_socket_csv2dbi from this recipe, and set its permissions to mode 755. $ sed -e '1,/^--- lsof_socket_csv2dbi ---$/d; /^--- lsof_socket_csv2dbi ---$/,$d' lsof-socket-csv2dbi.txt > lsof_socket_csv2dbi $ chmod 755 lsof_socket_csv2dbi 2. Process the data in lsof.out.csv prior to loading it into the database. In general, the two modifications need to be made in the following order: - Escape all backslash characters (i.e. '\') with an additional backslash character. - Replace all NULL fields with the value '\N'. The script you extracted in step 1 will automatically perform these transformations. It has the following syntax: Usage: lsof_socket_csv2dbi [-F] [-d db] [-h host] [-m max-rows] -f {file|-} where '-F' forces existing output files to be overwritten; '-d' specifies the name of the target database; '-h' specifies the name of the subject host (i.e. the system on which the data was collected) and causes the script to insert an extra column called hostname; '-m' specifies the maximum number of records that may exist in the destination table; and '-f' specifies the input file: either a regular file or stdin. Two output files, having the extensions .sql and .dbi are produced by this script. The SQL statements in the .sql file may be used to import the .dbi data into MySQL. Invoke lsof_socket_csv2dbi as shown below. This will produce two output files: lsof.csv.sql and lsof.csv.dbi. $ lsof_socket_csv2dbi -d host1 -f lsof.out.csv 3. Import the .dbi data using MySQL's command-line interface as follows: $ mysql host1 < lsof.csv.sql Note: MySQL typically requires data files to be world readable. This implies that all intermediate directories must also be world searchable. Alternatively, mysqlimport -- a command-line interface to the 'LOAD DATA' SQL statement -- can load the .dbi data. However, the destination table must exist prior to import, and its name must match the name of the file being loaded up to the first period. Thus, to load the contents of lsof.csv.dbi, you should create a table called lsof. Hint: use the CREATE statement in the .sql file to create the lsof table. Once the table has been created, use the following command to import the data. $ mysqlimport --fields-terminated-by='|' host1 lsof.csv.dbi Note: Unless mysqlimport supports an option to ignore one or more input lines, you should plan to remove the header line in lsof.csv.dbi prior to importing the map data. 4. Run SQL queries to analyze the loaded data. The queries given in Appendix 2 show how common analysis questions may be implemented in SQL. You may find it useful to append the ORDER BY clause to the end of various queries -- it has the affect of sorting the output based on the specified field. At first, you may want to limit output with the LIMIT clause to ensure that the query is working as expected. To prevent potentially confusing line wraps, you should use a wide xterm, or alternatively replace '*' with a specific list of columns. The following command will extract the example queries from this recipe: $ sed -e '1,/^--- lsof-socket-queries.sql ---$/d; /^--- lsof-socket-queries.sql ---$/,$d' lsof-socket-csv2dbi.txt > lsof-socket-queries.sql Closing Remarks Running lsof is much faster, more accurate, and provides more information than a typical port scanner. Centrally managing lsof socket information in a database and analyzing it with SQL queries can form the basis of a monitoring solution that is useful, efficient, flexible, and scalable. Credits This recipe was brought to you by Klayton Monroe. Appendix 1 --- lsof_socket_csv2dbi --- #!/usr/bin/perl -w ###################################################################### # # $RecipeId: lsof_socket_csv2dbi,v 1.4 2005/09/13 06:30:21 klm Exp $ # ###################################################################### # # Copyright 2002-2005 The WebJob Project, All Rights Reserved. # ###################################################################### # # Purpose: Prepare lsof socket data for MySQL db import. # ###################################################################### use strict; use Cwd; use File::Basename; use Getopt::Std; ###################################################################### # # Main Routine # ###################################################################### my $sProgram = basename(__FILE__); #################################################################### # # Validation expressions. # #################################################################### my $sDBRegex = qq(^[A-Za-z][A-Za-z0-9_]*\$); my $sDelimiterRegex = qq([,|]); my $sHostnameRegex = qq(^[\\w\\.-]{1,64}\$); my $sMaxRowsRegex = qq(^\\d+\$); #################################################################### # # SQL table creation parameters. # #################################################################### my %hTableLayout = ( 'hostname' => "varbinary(64) not null", 'login' => "varbinary(32) not null, index login_index (login)", 'uid' => "int unsigned, index uid_index (uid)", 'command' => "varbinary(64) not null, index command_index (command)", 'ppid' => "int unsigned", 'pid' => "int unsigned", 'type' => "varbinary(8) not null", 'protocol' => "varbinary(8) not null", 'state' => "varbinary(32) default null", 'src_ip' => "varbinary(16) not null", 'src_port' => "int unsigned not null, index src_port_index (src_port)", 'dst_ip' => "varbinary(16) default null", 'dst_port' => "int unsigned default null" ); #################################################################### # # Get Options. # #################################################################### my (%hOptions); if (!getopts('Fd:f:h:m:', \%hOptions)) { Usage($sProgram); } #################################################################### # # The ForceWrite, '-F', flag is optional. Default value is zero. # #################################################################### my $sForceWrite = (exists($hOptions{'F'})) ? 1 : 0; #################################################################### # # A Database, '-d', is optional. Default value is 'lsof'. # #################################################################### my $sDB = (exists($hOptions{'d'})) ? $hOptions{'d'} : "lsof"; if ($sDB !~ /$sDBRegex/) { print STDERR "$sProgram: Database='$hOptions{'d'}' Regex='$sDBRegex' Error='Invalid db name.'\n"; exit(1); } #################################################################### # # A Hostname, '-h', is optional. # #################################################################### my ($sHostname); if (exists($hOptions{'h'})) { if ($hOptions{'h'} !~ /$sHostnameRegex/) { print STDERR "$sProgram: Hostname='$sHostname' Regex='$sHostnameRegex' Error='Invalid hostname.'\n"; exit(1); } $sHostname = $hOptions{'h'}; } #################################################################### # # A MaxRows, '-m', is optional. # #################################################################### my $sMaxRows = (exists($hOptions{'m'})) ? $hOptions{'m'} : 0; if ($sMaxRows !~ /$sMaxRowsRegex/) { print STDERR "$sProgram: MaxRows='$sMaxRows' Regex='$sMaxRowsRegex' Error='Invalid number.\n"; exit(1); } #################################################################### # # A Filename, '-f', is required. It can be '-' or a regular file. # #################################################################### my ($sFileHandle, $sFilename, $sOutFile, $sSQLFile); if (!exists($hOptions{'f'})) { Usage($sProgram); exit(1); } else { $sFilename = $hOptions{'f'}; if (-f $sFilename) { if (!open(IN, $sFilename)) { print STDERR "$sProgram: Filename='$sFilename' Error='$!'\n"; exit(1); } $sFileHandle = \*IN; } else { if ($sFilename ne '-') { print STDERR "$sProgram: Filename='$sFilename' Error='File not found.'\n"; exit(1); } $sFilename = "lsof"; $sFileHandle = \*STDIN; } } $sOutFile = (($sFilename !~ /^\//) ? cwd() . "/" : "") . $sFilename . ".dbi"; $sSQLFile = (($sFilename !~ /^\//) ? cwd() . "/" : "") . $sFilename . ".sql"; ################################################################## # # Check file existence when ForceWrite is disabled. # ################################################################## if (!$sForceWrite && -f $sSQLFile) { print STDERR "$sProgram: Filename='$sSQLFile' Error='Output file already exists.'\n"; exit(1); } if (!$sForceWrite && -f $sOutFile) { print STDERR "$sProgram: Filename='$sOutFile' Error='Output file already exists.'\n"; exit(1); } ################################################################## # # Process the header. # ################################################################## my ($sHeader, $sHeaderFieldCount, @aHeaderFields); if (!defined($sHeader = <$sFileHandle>)) { print STDERR "$sProgram: Error='Header not defined.'\n"; exit(1); } chomp($sHeader); @aHeaderFields = split(/$sDelimiterRegex/, $sHeader); $sHeaderFieldCount = scalar(@aHeaderFields); for (my $sIndex = 0; $sIndex < $sHeaderFieldCount; $sIndex++) { if (!exists($hTableLayout{$aHeaderFields[$sIndex]})) { print STDERR "$sProgram: Field='$aHeaderFields[$sIndex]' Error='Field not recognized.'\n"; exit(1); } } unshift(@aHeaderFields, "hostname") if (defined($sHostname)); ################################################################## # # Open output files. # ################################################################## umask(022); if (!open(OUT, ">$sOutFile")) { print STDERR "$sProgram: Filename='$sOutFile' Error='$!'\n"; exit(1); } if (!open(SQL, ">$sSQLFile")) { print STDERR "$sProgram: Filename='$sSQLFile' Error='$!'\n"; exit(1); } ################################################################## # # Print the header. # ################################################################## print OUT join('|', @aHeaderFields) . "\n"; ################################################################## # # Process the data. # ################################################################## my ($sDataFieldCount, @aDataFields); while (my $sLine = <$sFileHandle>) { chomp($sLine); @aDataFields = split(/$sDelimiterRegex/, $sLine, -1); # Use large chunk size to preserve trailing NULL fields. $sDataFieldCount = scalar(@aDataFields); if ($sDataFieldCount != $sHeaderFieldCount) { print STDERR "$sProgram: Line='$sLine' HeaderFieldCount='$sHeaderFieldCount' DataFieldCount='$sDataFieldCount' Error='FieldCounts don't match.'\n"; close($sFileHandle); close(OUT); exit(1); } $sLine = (defined($sHostname)) ? join('|', ($sHostname, @aDataFields)) : join('|', (@aDataFields)); $sLine =~ s/\\/\\\\/g; # Escape backslashes. $sLine =~ s/\|\|/\|\\N\|/g; # Add embedded NULLs. $sLine =~ s/\|$/\|\\N/g; # Add end-of-line NULLs. print OUT $sLine . "\n"; } close($sFileHandle); close(OUT); ################################################################## # # Build the sql file. # ################################################################## my (@aColumns, $sCreateOptions, $sDate); $sDate = localtime(time); $sDate =~ s/ +/ /g; print SQL < 0) ? "MAX_ROWS = $sMaxRows" : ""; my $sCreateSpec = join(', ', @aColumns); my $sInsertSpec = join(', ', @aHeaderFields); print SQL <