Whamcloud - gitweb
LU-13376 utils: add batching to ofd_access_log_reader 35/38035/7
authorJohn L. Hammond <jhammond@whamcloud.com>
Mon, 23 Mar 2020 15:11:46 +0000 (10:11 -0500)
committerOleg Drokin <green@whamcloud.com>
Thu, 10 Sep 2020 15:00:26 +0000 (15:00 +0000)
Add interval based batching to ofd_access_log_reader. Add option to
control the batch interval, offset within the interval, and batch
output file.

Signed-off-by: John L. Hammond <jhammond@whamcloud.com>
Change-Id: I057e9616f4ec198dbf0c7c82a93a1e45907e7a42
Reviewed-on: https://review.whamcloud.com/38035
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Gu Zheng <gzheng@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/utils/Makefile.am
lustre/utils/ofd_access_batch.c [new file with mode: 0644]
lustre/utils/ofd_access_batch.h [new file with mode: 0644]
lustre/utils/ofd_access_log_reader.c

index 0f87553..1f769b8 100644 (file)
@@ -137,6 +137,8 @@ lr_reader_SOURCES = lr_reader.c
 
 ofd_access_log_reader_SOURCES = \
        lstddef.h \
+       ofd_access_batch.c \
+       ofd_access_batch.h \
        ofd_access_log_reader.c
 
 if UTILS
diff --git a/lustre/utils/ofd_access_batch.c b/lustre/utils/ofd_access_batch.c
new file mode 100644 (file)
index 0000000..aebc940
--- /dev/null
@@ -0,0 +1,428 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ *
+ * Copyright 2020, DataDirect Networks Storage.
+ *
+ * This file is part of Lustre, http://www.lustre.org/
+ *
+ * Author: John L. Hammond <jhammond@whamcloud.com>
+ *
+ * lustre/utils/ofd_access_batch.c
+ *
+ * Access log entry batching for ofd_access_log_reader.
+ */
+#include <stdbool.h>
+#include <stddef.h>
+#include <assert.h>
+#include <malloc.h>
+#include <linux/lustre/lustre_access_log.h>
+#include <linux/lustre/lustre_fid.h>
+#include <linux/lustre/lustre_idl.h>
+#include <libcfs/util/hash.h>
+#include <libcfs/util/list.h>
+#include "lstddef.h"
+#include "ofd_access_batch.h"
+
+/* XXX Weird param order to be consistent with list_replace_init(). */
+static inline void hlist_replace_init(struct hlist_node *old_node,
+                               struct hlist_node *new_node)
+{
+       hlist_add_before(new_node, old_node);
+       hlist_del_init(old_node);
+}
+
+struct fid_hash_node {
+       struct hlist_node fhn_node;
+       struct lu_fid fhn_fid;
+};
+
+static inline bool fid_eq(const struct lu_fid *f1, const struct lu_fid *f2)
+{
+       return f1->f_seq == f2->f_seq && f1->f_oid == f2->f_oid &&
+              f1->f_ver == f2->f_ver;
+}
+
+static inline __u64 fid_flatten(const struct lu_fid *fid)
+{
+       __u64 ino;
+       __u64 seq;
+
+       if (fid_is_igif(fid)) {
+               ino = lu_igif_ino(fid);
+               return ino;
+       }
+
+       seq = fid_seq(fid);
+
+       ino = (seq << 24) + ((seq >> 24) & 0xffffff0000ULL) + fid_oid(fid);
+
+       return ino != 0 ? ino : fid_oid(fid);
+}
+
+/**
+ * map fid to 32 bit value for ino on 32bit systems.
+ */
+static inline __u32 fid_flatten32(const struct lu_fid *fid)
+{
+       __u32 ino;
+       __u64 seq;
+
+       if (fid_is_igif(fid)) {
+               ino = lu_igif_ino(fid);
+               return ino;
+       }
+
+       seq = fid_seq(fid) - FID_SEQ_START;
+
+       /* Map the high bits of the OID into higher bits of the inode number so
+        * that inodes generated at about the same time have a reduced chance
+        * of collisions. This will give a period of 2^12 = 1024 unique clients
+        * (from SEQ) and up to min(LUSTRE_SEQ_MAX_WIDTH, 2^20) = 128k objects
+        * (from OID), or up to 128M inodes without collisions for new files.
+        */
+       ino = ((seq & 0x000fffffULL) << 12) + ((seq >> 8) & 0xfffff000) +
+             (seq >> (64 - (40-8)) & 0xffffff00) +
+             (fid_oid(fid) & 0xff000fff) + ((fid_oid(fid) & 0x00fff000) << 8);
+
+       return ino != 0 ? ino : fid_oid(fid);
+}
+
+static unsigned long fid_hash(const struct lu_fid *f, unsigned int shift)
+{
+#if __BITS_PER_LONG == 32
+       return hash_long(fid_flatten32(f), shift);
+#elif __BITS_PER_LONG == 64
+       return hash_long(fid_flatten(f), shift);
+#else
+# error "Wordsize not 32 or 64"
+#endif
+}
+
+static void fhn_init(struct fid_hash_node *fhn, const struct lu_fid *fid)
+{
+       INIT_HLIST_NODE(&fhn->fhn_node);
+       fhn->fhn_fid = *fid;
+}
+
+static bool fhn_is_hashed(const struct fid_hash_node *fhn)
+{
+       return !hlist_unhashed(&fhn->fhn_node);
+}
+
+static void fhn_del_init(struct fid_hash_node *fhn)
+{
+       if (fhn_is_hashed(fhn))
+               hlist_del_init(&fhn->fhn_node);
+}
+
+static inline void fhn_replace_init(struct fid_hash_node *old_fhn,
+                               struct fid_hash_node *new_fhn)
+{
+       hlist_add_before(&new_fhn->fhn_node, &old_fhn->fhn_node);
+       hlist_del_init(&old_fhn->fhn_node);
+}
+
+void fid_hash_add(struct hlist_head *head, unsigned int shift,
+               struct fid_hash_node *fhn)
+{
+       assert(!fhn_is_hashed(fhn));
+
+       hlist_add_head(&fhn->fhn_node, &head[fid_hash(&fhn->fhn_fid, shift)]);
+}
+
+struct fid_hash_node *
+fid_hash_find(struct hlist_head *head, unsigned int shift, const struct lu_fid *fid)
+{
+       struct hlist_head *hash_list;
+       struct hlist_node *node, *next;
+       struct fid_hash_node *fhn;
+
+       hash_list = &head[fid_hash(fid, shift)];
+       hlist_for_each_entry_safe(fhn, node, next, hash_list, fhn_node) {
+               assert(fhn_is_hashed(fhn));
+
+               if (fid_eq(fid, &fhn->fhn_fid))
+                       return fhn;
+       }
+
+       return NULL;
+}
+
+struct fid_hash_node *
+fid_hash_insert(struct hlist_head *head, unsigned int shift, struct fid_hash_node *new_fhn)
+{
+       struct hlist_head *list;
+       struct hlist_node *node, *next;
+       struct fid_hash_node *old_fhn;
+
+       list = &head[fid_hash(&new_fhn->fhn_fid, shift)];
+       hlist_for_each_entry_safe(old_fhn, node, next, list, fhn_node) {
+               assert(fhn_is_hashed(old_fhn));
+
+               if (fid_eq(&old_fhn->fhn_fid, &new_fhn->fhn_fid))
+                       return old_fhn;
+       }
+
+       hlist_add_head(&new_fhn->fhn_node, list);
+
+       return new_fhn;
+}
+
+int fid_hash_init(struct hlist_head **phead, unsigned int *pshift, unsigned int shift)
+{
+       struct hlist_head *new_head;
+       unsigned int i;
+
+       new_head = malloc(sizeof(*new_head) << shift);
+       if (new_head == NULL)
+               return -1;
+
+       for (i = 0; i < (1 << shift); i++)
+               INIT_HLIST_HEAD(&new_head[i]);
+
+       *phead = new_head;
+       *pshift = shift;
+
+       return 0;
+}
+
+int fid_hash_resize(struct hlist_head **phead, unsigned int *pshift, unsigned int new_shift)
+{
+       struct hlist_head *new_head;
+       unsigned int i;
+       int rc;
+
+       if (*pshift == new_shift)
+               return 0;
+
+       rc = fid_hash_init(&new_head, &new_shift, new_shift);
+       if (rc < 0)
+               return rc;
+
+       for (i = 0; i < (1 << *pshift); i++) {
+               struct hlist_head *list = &(*phead)[i];
+               struct hlist_node *node, *next;
+               struct fid_hash_node *fhn;
+
+               hlist_for_each_entry_safe(fhn, node, next, list, fhn_node) {
+                       fhn_del_init(fhn);
+                       fid_hash_add(new_head, new_shift, fhn);
+               }
+       }
+
+       free(*phead);
+       *phead = new_head;
+       *pshift = new_shift;
+
+       return 0;
+}
+
+enum {
+       ALR_READ = 0,
+       ALR_WRITE = 1,
+};
+
+/* Entry in the batching hash. */
+struct alr_entry {
+       struct fid_hash_node alre_fid_hash_node;
+       time_t alre_time[2]; /* Not strictly needed. */
+       __u64 alre_begin[2];
+       __u64 alre_end[2];
+       __u64 alre_size[2];
+       __u64 alre_segment_count[2];
+       __u64 alre_count[2];
+       char alre_obd_name[];
+};
+
+enum {
+       ALR_BATCH_HASH_SHIFT_DEFAULT = 10,
+       ALR_BATCH_HASH_SHIFT_MAX = 30,
+};
+
+struct alr_batch {
+       struct hlist_head *alrb_hash;
+       unsigned int alrb_hash_shift;
+       unsigned int alrb_count;
+};
+
+static void alre_del_init(struct alr_entry *alre)
+{
+       fhn_del_init(&alre->alre_fid_hash_node);
+}
+
+static void alre_update(struct alr_entry *alre, time_t time, __u64 begin,
+                       __u64 end, __u32 size, __u32 segment_count, __u32 flags)
+{
+       unsigned int d = (flags & OFD_ACCESS_READ) ? ALR_READ : ALR_WRITE;
+
+       alre->alre_time[d] = max_t(time_t, alre->alre_time[d], time);
+       alre->alre_begin[d] = min_t(__u64, alre->alre_begin[d], begin);
+       alre->alre_end[d] = max_t(__u64, alre->alre_end[d], end);
+       alre->alre_size[d] += size;
+       alre->alre_segment_count[d] += segment_count;
+       alre->alre_count[d] += 1;
+}
+
+int alr_batch_add(struct alr_batch *alrb, const char *obd_name,
+               const struct lu_fid *pfid, time_t time, __u64 begin, __u64 end,
+               __u32 size, __u32 segment_count, __u32 flags)
+{
+       struct fid_hash_node fhn, *p;
+       struct alr_entry *alre;
+       int rc;
+
+       if (alrb == NULL)
+               return 0;
+
+       assert(sizeof(time_t) == sizeof(__u64));
+
+       fhn_init(&fhn, pfid);
+
+       /* Find old or insert sentinel (fhn). Replace sentinel if returned. */
+       p = fid_hash_insert(alrb->alrb_hash, alrb->alrb_hash_shift, &fhn);
+       if (p == &fhn) {
+               size_t alre_size = sizeof(*alre) + strlen(obd_name) + 1;
+
+               alre = calloc(1, alre_size);
+               if (alre == NULL) {
+                       rc = -1;
+                       goto out;
+               }
+
+               fhn_init(&alre->alre_fid_hash_node, pfid);
+               strcpy(alre->alre_obd_name, obd_name);
+               fhn_replace_init(&fhn, &alre->alre_fid_hash_node);
+               alrb->alrb_count++;
+       } else {
+               alre = container_of(p, struct alr_entry, alre_fid_hash_node);
+       }
+
+       alre_update(alre, time, begin, end, size, segment_count, flags);
+       rc = 0;
+out:
+       fhn_del_init(&fhn);
+
+       return rc;
+}
+
+/* Print, clear, and resize the batch. */
+int alr_batch_print(struct alr_batch *alrb, FILE *file)
+{
+       unsigned int i;
+       unsigned int new_hash_shift;
+       int rc = 0;
+
+       if (alrb == NULL)
+               return 0;
+
+       for (i = 0; i < (1 << alrb->alrb_hash_shift); i++) {
+               struct hlist_head *list = &alrb->alrb_hash[i];
+               struct hlist_node *node, *next;
+               struct alr_entry *alre;
+
+               hlist_for_each_entry_safe(alre, node, next, list,
+                                       alre_fid_hash_node.fhn_node) {
+                       unsigned int d;
+
+                       for (d = 0; d < 2; d++) {
+                               int rc2;
+
+                               if (alre->alre_count[d] == 0)
+                                       continue;
+
+                               /* stdio stream error state is sticky. */
+                               rc2 = fprintf(file,
+                                       "%s "DFID" %lld %llu %llu %llu %llu %llu %c\n",
+                                       alre->alre_obd_name,
+                                       PFID(&alre->alre_fid_hash_node.fhn_fid),
+                                       (long long)alre->alre_time[d],
+                                       (unsigned long long)alre->alre_begin[d],
+                                       (unsigned long long)alre->alre_end[d],
+                                       (unsigned long long)alre->alre_size[d],
+                                       (unsigned long long)alre->alre_segment_count[d],
+                                       (unsigned long long)alre->alre_count[d],
+                                       (d == ALR_READ) ? 'r' : 'w');
+                               if (rc2 < 0)
+                                       rc = rc2;
+                       }
+
+                       alre_del_init(alre);
+                       free(alre);
+               }
+       }
+
+       /* Resize hash based on previous count. */
+       new_hash_shift = alrb->alrb_hash_shift;
+
+       while (new_hash_shift < ALR_BATCH_HASH_SHIFT_MAX &&
+              (1 << new_hash_shift) < alrb->alrb_count)
+               new_hash_shift++;
+
+       fid_hash_resize(&alrb->alrb_hash, &alrb->alrb_hash_shift,
+                       new_hash_shift);
+
+       alrb->alrb_count = 0;
+
+       return rc;
+}
+
+struct alr_batch *alr_batch_create(unsigned int shift)
+{
+       struct alr_batch *alrb;
+       int rc;
+
+       if (shift == -1U)
+               shift = ALR_BATCH_HASH_SHIFT_DEFAULT;
+
+       alrb = calloc(1, sizeof(*alrb));
+       if (alrb == NULL)
+               return NULL;
+
+       rc = fid_hash_init(&alrb->alrb_hash, &alrb->alrb_hash_shift, shift);
+       if (rc < 0) {
+               free(alrb);
+               return NULL;
+       }
+
+       return alrb;
+}
+
+void alr_batch_destroy(struct alr_batch *alrb)
+{
+       unsigned int i;
+
+       if (alrb == NULL)
+               return;
+
+       for (i = 0; i < (1 << alrb->alrb_hash_shift); i++) {
+               struct hlist_head *list = &alrb->alrb_hash[i];
+               struct hlist_node *node, *next;
+               struct alr_entry *alre;
+
+               hlist_for_each_entry_safe(alre, node, next, list, alre_fid_hash_node.fhn_node) {
+                       alre_del_init(alre);
+                       free(alre);
+               }
+       }
+
+       free(alrb->alrb_hash);
+       free(alrb);
+}
diff --git a/lustre/utils/ofd_access_batch.h b/lustre/utils/ofd_access_batch.h
new file mode 100644 (file)
index 0000000..e06278f
--- /dev/null
@@ -0,0 +1,16 @@
+#ifndef _OFD_ACCESS_BATCH_H_
+#define _OFD_ACCESS_BATCH_H_
+#include <sys/types.h>
+#include <linux/types.h>
+
+struct lu_fid;
+struct alr_batch;
+
+struct alr_batch *alr_batch_create(unsigned int shift);
+void alr_batch_destroy(struct alr_batch *alrb);
+int alr_batch_add(struct alr_batch *alrb, const char *obd_name,
+               const struct lu_fid *pfid, time_t time, __u64 begin, __u64 end,
+               __u32 size, __u32 segment_count, __u32 flags);
+int alr_batch_print(struct alr_batch *alrb, FILE *file);
+
+#endif /* _OFD_ACCESS_BATCH_H_ */
index 6625241..35d4a1c 100644 (file)
 #include <sys/signalfd.h>
 #include <sys/stat.h>
 #include <sys/sysmacros.h>
+#include <sys/timerfd.h>
 #include <sys/types.h>
 #include <linux/types.h>
 #include <linux/lustre/lustre_user.h>
 #include <linux/lustre/lustre_access_log.h>
+#include "ofd_access_batch.h"
 #include "lstddef.h"
 
 /* TODO fsname filter */
@@ -82,12 +84,10 @@ static FILE *trace_file;
                        fprintf(trace_file, "TRACE "fmt, ##args);       \
        } while (0)
 
-#define DEBUG_D(x) DEBUG("%s = %d\n", #x, x)
+#define DEBUG_D(x) DEBUG("%s = %"PRIdMAX"\n", #x, (intmax_t)x)
 #define DEBUG_P(x) DEBUG("%s = %p\n", #x, x)
 #define DEBUG_S(x) DEBUG("%s = '%s'\n", #x, x)
-#define DEBUG_U(x) DEBUG("%s = %u\n", #x, x)
-#define DEBUG_U32(x) DEBUG("%s = %"PRIu32"\n", #x, x)
-#define DEBUG_U64(x) DEBUG("%s = %"PRIu64"\n", #x, x)
+#define DEBUG_U(x) DEBUG("%s = %"PRIuMAX"\n", #x, (uintmax_t)x)
 
 #define ERROR(fmt, args...) \
        fprintf(stderr, "%s: "fmt, program_invocation_short_name, ##args)
@@ -125,6 +125,9 @@ static struct alr_log *alr_log[1 << 20]; /* 20 == MINORBITS */
 static int oal_version; /* FIXME ... major version, minor version */
 static unsigned int oal_log_major;
 static unsigned int oal_log_minor_max;
+static struct alr_batch *alr_batch;
+static FILE *alr_batch_file;
+static const char *alr_batch_file_path;
 
 #define D_ALR_DEV "%s %d"
 #define P_ALR_DEV(ad) \
@@ -221,6 +224,10 @@ static int alr_log_io(int epoll_fd, struct alr_dev *ad, unsigned int mask)
                        (unsigned int)oae->oae_size,
                        (unsigned int)oae->oae_segment_count,
                        alr_flags_to_str(oae->oae_flags));
+
+               alr_batch_add(alr_batch, ad->alr_name, &oae->oae_parent_fid,
+                       oae->oae_time, oae->oae_begin, oae->oae_end,
+                       oae->oae_size, oae->oae_segment_count, oae->oae_flags);
        }
 
        return ALR_OK;
@@ -251,6 +258,8 @@ static int alr_log_add(int epoll_fd, const char *path)
        int fd = -1;
        int rc;
 
+       DEBUG_S(path);
+
        fd = open(path, O_RDONLY|O_NONBLOCK|O_CLOEXEC);
        if (fd < 0) {
                ERROR("cannot open device '%s': %s\n", path, strerror(errno));
@@ -461,7 +470,7 @@ static int alr_signal_io(int epoll_fd, struct alr_dev *sd, unsigned int mask)
        if (rc <= 0)
                return ALR_OK;
 
-       DEBUG_U32(ssi.ssi_signo);
+       DEBUG_U(ssi.ssi_signo);
        switch (ssi.ssi_signo) {
        case SIGINT:
        case SIGTERM:
@@ -471,6 +480,44 @@ static int alr_signal_io(int epoll_fd, struct alr_dev *sd, unsigned int mask)
        }
 }
 
+/* batching timerfd epoll callback. Print batched access entries to
+ * alr_batch_file. */
+static int alr_batch_timer_io(int epoll_fd, struct alr_dev *td, unsigned int mask)
+{
+       time_t now = time(NULL);
+       uint64_t expire_count;
+       ssize_t rc;
+
+       TRACE("%s\n", __func__);
+       DEBUG_D(now);
+       DEBUG_U(mask);
+
+       rc = read(td->alr_fd, &expire_count, sizeof(expire_count));
+       if (rc <= 0)
+               return ALR_OK;
+
+       DEBUG_U(expire_count);
+
+       rc = alr_batch_print(alr_batch, alr_batch_file);
+       if (rc < 0) {
+               ERROR("cannot write to '%s': %s\n",
+                       alr_batch_file_path, strerror(errno));
+               goto out;
+       }
+
+       /* FIXME: blocking write to batch file. */
+       rc = fflush(alr_batch_file);
+       if (rc < 0) {
+               ERROR("cannot write to '%s': %s\n",
+                       alr_batch_file_path, strerror(errno));
+               goto out;
+       }
+out:
+       /* Failed writes will leave alr_batch_file (pipe) in a
+        * weird state so make that fatal. */
+       return (rc < 0) ? ALR_EXIT_FAILURE : ALR_OK;
+}
+
 /* Call LUSTRE_ACCESS_LOG_IOCTL_INFO to get access log info and print
  * YAML formatted info to stdout. */
 static int alr_log_info(struct alr_log *al)
@@ -545,21 +592,41 @@ static struct alr_dev *alr_dev_create(int epoll_fd, int fd, const char *name,
        return alr;
 }
 
+void usage(void)
+{
+       printf("Usage: %s: [OPTION]...\n"
+"Discover, read, batch, and write Lustre access logs\n"
+"\n"
+"Mandatory arguments to long options are mandatory for short options too.\n"
+"  -f, --batch-file=FILE          print batch to file (default stdout)\n"
+"  -i, --batch-interval=INTERVAL  print batch every INTERVAL seconds\n"
+"  -o, --batch-offset=OFFSET      print batch at OFFSET seconds\n"
+"  -d, --debug[=FILE]             print debug messages to FILE (stderr)\n"
+"  -h, --help                     display this help and exit\n"
+"  -l, --list                     print YAML list of available access logs\n"
+"  -t, --trace[=FILE]             print trace messages to FILE (stderr)\n",
+               program_invocation_short_name);
+}
+
 int main(int argc, char *argv[])
 {
        const char ctl_path[] = "/dev/"LUSTRE_ACCESS_LOG_DIR_NAME"/control";
        struct alr_dev *alr_signal = NULL;
+       struct alr_dev *alr_batch_timer = NULL;
        struct alr_dev *alr_ctl = NULL;
+       time_t batch_interval = 0;
+       time_t batch_offset = 0;
        unsigned int m;
        int list_info = 0;
        int epoll_fd = -1;
-       int signal_fd = -1;
-       int ctl_fd = -1;
        int exit_status;
        int rc;
        int c;
 
        static struct option options[] = {
+               { .name = "batch-file", .has_arg = required_argument, .val = 'f', },
+               { .name = "batch-interval", .has_arg = required_argument, .val = 'i', },
+               { .name = "batch-offset", .has_arg = required_argument, .val = 'o', },
                { .name = "debug", .has_arg = optional_argument, .val = 'd', },
                { .name = "help", .has_arg = no_argument, .val = 'h', },
                { .name = "list", .has_arg = no_argument, .val = 'l', },
@@ -567,8 +634,25 @@ int main(int argc, char *argv[])
                { .name = NULL, },
        };
 
-       while ((c = getopt_long(argc, argv, "d::hlt::", options, NULL)) != -1) {
+       while ((c = getopt_long(argc, argv, "d::f:hi:lt::", options, NULL)) != -1) {
                switch (c) {
+               case 'f':
+                       alr_batch_file_path = optarg;
+                       break;
+               case 'i':
+                       errno = 0;
+                       batch_interval = strtoll(optarg, NULL, 0);
+                       if (batch_interval < 0 || batch_interval >= 1048576 ||
+                           errno != 0)
+                               FATAL("invalid batch interval '%s'\n", optarg);
+                       break;
+               case 'o':
+                       errno = 0;
+                       batch_offset = strtoll(optarg, NULL, 0);
+                       if (batch_offset < 0 || batch_offset >= 1048576 ||
+                           errno != 0)
+                               FATAL("invalid batch offset '%s'\n", optarg);
+                       break;
                case 'd':
                        if (optarg == NULL) {
                                debug_file = stderr;
@@ -583,7 +667,7 @@ int main(int argc, char *argv[])
 
                        break;
                case 'h':
-                       /* ... */
+                       usage();
                        exit(EXIT_SUCCESS);
                case 'l':
                        list_info = 1;
@@ -602,11 +686,29 @@ int main(int argc, char *argv[])
 
                        break;
                case '?':
-                       /* Try ... for more ... */
+                       fprintf(stderr, "Try '%s --help' for more information.\n",
+                               program_invocation_short_name);
                        exit(EXIT_FAILURE);
                }
        }
 
+       if (batch_interval > 0) {
+               alr_batch = alr_batch_create(-1);
+               if (alr_batch == NULL)
+                       FATAL("cannot create batch struct: %s\n",
+                               strerror(errno));
+       }
+
+       if (alr_batch_file_path != NULL) {
+               alr_batch_file = fopen(alr_batch_file_path, "w");
+               if (alr_batch_file == NULL)
+                       FATAL("cannot open batch file '%s': %s\n",
+                               alr_batch_file_path, strerror(errno));
+       } else {
+               alr_batch_file_path = "stdout";
+               alr_batch_file = stdout;
+       }
+
        epoll_fd = epoll_create1(EPOLL_CLOEXEC);
        if (epoll_fd < 0)
                FATAL("cannot create epoll set: %s\n", strerror(errno));
@@ -620,7 +722,7 @@ int main(int argc, char *argv[])
        if (rc < 0)
                FATAL("cannot set process signal mask: %s\n", strerror(errno));
 
-       signal_fd = signalfd(-1, &signal_mask, SFD_NONBLOCK|SFD_CLOEXEC);
+       int signal_fd = signalfd(-1, &signal_mask, SFD_NONBLOCK|SFD_CLOEXEC);
        if (signal_fd < 0)
                FATAL("cannot create signalfd: %s\n", strerror(errno));
 
@@ -630,8 +732,39 @@ int main(int argc, char *argv[])
 
        signal_fd = -1;
 
+       /* Setup batch timer FD and add to epoll set. */
+       struct timespec now;
+       rc = clock_gettime(CLOCK_REALTIME, &now);
+       if (rc < 0)
+               FATAL("cannot read realtime clock: %s\n", strerror(errno));
+
+       int timer_fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK|TFD_CLOEXEC);
+       if (timer_fd < 0)
+               FATAL("cannot create batch timerfd: %s\n", strerror(errno));
+
+       struct itimerspec it = {
+               .it_value.tv_sec = (batch_interval > 0) ?
+                                  roundup(now.tv_sec, batch_interval) +
+                                  (batch_offset % batch_interval) :
+                                  0,
+               .it_interval.tv_sec = batch_interval,
+       };
+
+       DEBUG_D(it.it_value.tv_sec);
+
+       rc = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &it, NULL);
+       if (rc < 0)
+               FATAL("cannot arm timerfd: %s\n", strerror(errno));
+
+       alr_batch_timer = alr_dev_create(epoll_fd, timer_fd, "batch_timer",
+                                       &alr_batch_timer_io, NULL);
+       if (alr_batch_timer == NULL)
+               FATAL("cannot register batch timerfd: %s\n", strerror(errno));
+
+       timer_fd = -1;
+
        /* Open control device. */
-       ctl_fd = open(ctl_path, O_RDONLY|O_NONBLOCK|O_CLOEXEC);
+       int ctl_fd = open(ctl_path, O_RDONLY|O_NONBLOCK|O_CLOEXEC);
        if (ctl_fd < 0)
                FATAL("cannot open '%s': %s\n", ctl_path, strerror(errno));
 
@@ -715,8 +848,11 @@ out:
 
        alr_dev_free(epoll_fd, alr_ctl);
        alr_dev_free(epoll_fd, alr_signal);
+       alr_dev_free(epoll_fd, alr_batch_timer);
        close(epoll_fd);
 
+       alr_batch_destroy(alr_batch);
+
        DEBUG_D(exit_status);
 
        return exit_status;