Whamcloud - gitweb
LU-14052 ofd: support for multiple access readers 06/39906/15
authorAlex Zhuravlev <bzzz@whamcloud.com>
Tue, 15 Sep 2020 12:42:17 +0000 (15:42 +0300)
committerOleg Drokin <green@whamcloud.com>
Thu, 19 Nov 2020 11:00:22 +0000 (11:00 +0000)
ofd_access_log_reader can be passed -I, --mdt-index-filter=INDEX to
print only FIDs located on INDEX.

Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Change-Id: I9a4f09c6b7ca15623d459df17939895301a57a8b
Reviewed-on: https://review.whamcloud.com/39906
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: John L. Hammond <jhammond@whamcloud.com>
Reviewed-by: Jian Yu <yujian@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/uapi/linux/lustre/lustre_access_log.h
lustre/ofd/ofd_access_log.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_io.c
lustre/tests/sanity.sh
lustre/utils/ofd_access_log_reader.c

index 0e391cf..4972976 100644 (file)
@@ -74,6 +74,12 @@ enum {
        /* /dev/lustre-access-log/OBDNAME ioctl: populate struct
         * lustre_access_log_info_v1 for the current device. */
        LUSTRE_ACCESS_LOG_IOCTL_INFO = _IOR('O', 0x84, struct lustre_access_log_info_v1),
+
+       /* /dev/lustre-access-log/OBDNAME ioctl: only entries whose
+        * PFID MDT index is equal to arg will be added to the log. A
+        * value of 0xfffffffff ((__u32)-1) will disable filtering
+        * which is the default.  Added in V2. */
+       LUSTRE_ACCESS_LOG_IOCTL_FILTER = _IOW('O', 0x85, __u32),
 };
 
 #endif /* _LUSTRE_ACCESS_LOG_H */
index 4447de0..d7e96f5 100644 (file)
@@ -43,16 +43,24 @@ struct ofd_access_log {
        char oal_name[128]; /* lustre-OST0000 */
        struct device oal_device;
        struct cdev oal_cdev;
-       struct circ_buf oal_circ;
-       wait_queue_head_t oal_read_wait_queue;
-       spinlock_t oal_read_lock;
-       spinlock_t oal_write_lock;
-       unsigned int oal_drop_count;
+       struct rw_semaphore oal_buf_list_sem;
+       struct list_head oal_circ_buf_list;
        unsigned int oal_is_closed;
        unsigned int oal_log_size;
        unsigned int oal_entry_size;
 };
 
+struct oal_circ_buf {
+       struct list_head ocb_list;
+       spinlock_t ocb_write_lock;
+       spinlock_t ocb_read_lock;
+       struct ofd_access_log *ocb_access_log;
+       __u32 ocb_filter;
+       wait_queue_head_t ocb_read_wait_queue;
+       unsigned int ocb_drop_count;
+       struct circ_buf ocb_circ;
+};
+
 static atomic_t oal_control_event_count = ATOMIC_INIT(0);
 static DECLARE_WAIT_QUEUE_HEAD(oal_control_wait_queue);
 
@@ -105,17 +113,20 @@ static void oal_log_minor_free(int minor)
        spin_unlock(&oal_log_minor_lock);
 }
 
-static bool oal_is_empty(struct ofd_access_log *oal)
+static bool oal_is_empty(struct oal_circ_buf *ocb)
 {
-       return CIRC_CNT(oal->oal_circ.head,
-                       oal->oal_circ.tail,
+       struct ofd_access_log *oal = ocb->ocb_access_log;
+
+       return CIRC_CNT(ocb->ocb_circ.head,
+                       ocb->ocb_circ.tail,
                        oal->oal_log_size) < oal->oal_entry_size;
 }
 
-static ssize_t oal_write_entry(struct ofd_access_log *oal,
+static ssize_t oal_write_entry(struct oal_circ_buf *ocb,
                        const void *entry, size_t entry_size)
 {
-       struct circ_buf *circ = &oal->oal_circ;
+       struct ofd_access_log *oal = ocb->ocb_access_log;
+       struct circ_buf *circ = &ocb->ocb_circ;
        unsigned int head;
        unsigned int tail;
        ssize_t rc;
@@ -123,7 +134,7 @@ static ssize_t oal_write_entry(struct ofd_access_log *oal,
        if (entry_size != oal->oal_entry_size)
                return -EINVAL;
 
-       spin_lock(&oal->oal_write_lock);
+       spin_lock(&ocb->ocb_write_lock);
        head = circ->head;
        tail = READ_ONCE(circ->tail);
 
@@ -131,7 +142,7 @@ static ssize_t oal_write_entry(struct ofd_access_log *oal,
         * 1. It always leaves one free char, since a completely full
         * buffer would have head == tail, which is the same as empty. */
        if (CIRC_SPACE(head, tail, oal->oal_log_size) < oal->oal_entry_size) {
-               oal->oal_drop_count++;
+               ocb->ocb_drop_count++;
                rc = -EAGAIN;
                goto out_write_lock;
        }
@@ -143,9 +154,9 @@ static ssize_t oal_write_entry(struct ofd_access_log *oal,
        smp_store_release(&circ->head,
                        (head + oal->oal_entry_size) & (oal->oal_log_size - 1));
 
-       wake_up(&oal->oal_read_wait_queue);
+       wake_up(&ocb->ocb_read_wait_queue);
 out_write_lock:
-       spin_unlock(&oal->oal_write_lock);
+       spin_unlock(&ocb->ocb_write_lock);
 
        return rc;
 }
@@ -154,10 +165,11 @@ out_write_lock:
  * When the log is empty we return -EAGAIN if the OST is still mounted
  * and 0 otherwise.
  */
-static ssize_t oal_read_entry(struct ofd_access_log *oal,
+static ssize_t oal_read_entry(struct oal_circ_buf *ocb,
                        void *entry_buf, size_t entry_buf_size)
 {
-       struct circ_buf *circ = &oal->oal_circ;
+       struct ofd_access_log *oal = ocb->ocb_access_log;
+       struct circ_buf *circ = &ocb->ocb_circ;
        unsigned int head;
        unsigned int tail;
        ssize_t rc;
@@ -165,7 +177,7 @@ static ssize_t oal_read_entry(struct ofd_access_log *oal,
        /* XXX This method may silently truncate entries when
         * entry_buf_size is less than oal_entry_size. But that's OK
         * because you know what you are doing. */
-       spin_lock(&oal->oal_read_lock);
+       spin_lock(&ocb->ocb_read_lock);
 
        /* Memory barrier usage follows circular-buffers.txt. */
        head = smp_load_acquire(&circ->head);
@@ -190,15 +202,37 @@ static ssize_t oal_read_entry(struct ofd_access_log *oal,
                        (tail + oal->oal_entry_size) & (oal->oal_log_size - 1));
 
 out_read_lock:
-       spin_unlock(&oal->oal_read_lock);
+       spin_unlock(&ocb->ocb_read_lock);
 
        return rc;
 }
 
 static int oal_file_open(struct inode *inode, struct file *filp)
 {
-       filp->private_data = container_of(inode->i_cdev,
-                                       struct ofd_access_log, oal_cdev);
+       struct ofd_access_log *oal;
+       struct oal_circ_buf *ocb;
+
+       oal = container_of(inode->i_cdev, struct ofd_access_log, oal_cdev);
+
+       ocb = kzalloc(sizeof(*ocb), GFP_KERNEL);
+       if (!ocb)
+               return -ENOMEM;
+       ocb->ocb_circ.buf = vmalloc(oal->oal_log_size);
+       if (!ocb->ocb_circ.buf) {
+               kfree(ocb);
+               return -ENOMEM;
+       }
+
+       spin_lock_init(&ocb->ocb_write_lock);
+       spin_lock_init(&ocb->ocb_read_lock);
+       ocb->ocb_access_log = oal;
+       init_waitqueue_head(&ocb->ocb_read_wait_queue);
+
+       down_write(&oal->oal_buf_list_sem);
+       list_add(&ocb->ocb_list, &oal->oal_circ_buf_list);
+       up_write(&oal->oal_buf_list_sem);
+
+       filp->private_data = ocb;
 
        return nonseekable_open(inode, filp);
 }
@@ -207,7 +241,8 @@ static int oal_file_open(struct inode *inode, struct file *filp)
 static ssize_t oal_file_read(struct file *filp, char __user *buf, size_t count,
                        loff_t *ppos)
 {
-       struct ofd_access_log *oal = filp->private_data;
+       struct oal_circ_buf *ocb = filp->private_data;
+       struct ofd_access_log *oal = ocb->ocb_access_log;
        void *entry;
        size_t size = 0;
        int rc = 0;
@@ -223,13 +258,13 @@ static ssize_t oal_file_read(struct file *filp, char __user *buf, size_t count,
                return -ENOMEM;
 
        while (size < count) {
-               rc = oal_read_entry(oal, entry, oal->oal_entry_size);
+               rc = oal_read_entry(ocb, entry, oal->oal_entry_size);
                if (rc == -EAGAIN) {
                        if (filp->f_flags & O_NONBLOCK)
                                break;
 
-                       rc = wait_event_interruptible(oal->oal_read_wait_queue,
-                               !oal_is_empty(oal) || oal->oal_is_closed);
+                       rc = wait_event_interruptible(ocb->ocb_read_wait_queue,
+                               !oal_is_empty(ocb) || oal->oal_is_closed);
                        if (rc)
                                break;
                } else if (rc <= 0) {
@@ -255,7 +290,8 @@ static ssize_t oal_file_read(struct file *filp, char __user *buf, size_t count,
 static ssize_t oal_file_write(struct file *filp, const char __user *buf,
                        size_t count, loff_t *ppos)
 {
-       struct ofd_access_log *oal = filp->private_data;
+       struct oal_circ_buf *ocb = filp->private_data;
+       struct ofd_access_log *oal = ocb->ocb_access_log;
        void *entry;
        size_t size = 0;
        ssize_t rc = 0;
@@ -276,7 +312,7 @@ static ssize_t oal_file_write(struct file *filp, const char __user *buf,
                        break;
                }
 
-               rc = oal_write_entry(oal, entry, oal->oal_entry_size);
+               rc = oal_write_entry(ocb, entry, oal->oal_entry_size);
                if (rc <= 0)
                        break;
 
@@ -291,29 +327,32 @@ static ssize_t oal_file_write(struct file *filp, const char __user *buf,
 
 unsigned int oal_file_poll(struct file *filp, struct poll_table_struct *wait)
 {
-       struct ofd_access_log *oal = filp->private_data;
+       struct oal_circ_buf *ocb = filp->private_data;
+       struct ofd_access_log *oal = ocb->ocb_access_log;
        unsigned int mask = 0;
 
-       poll_wait(filp, &oal->oal_read_wait_queue, wait);
+       poll_wait(filp, &ocb->ocb_read_wait_queue, wait);
 
-       spin_lock(&oal->oal_read_lock);
+       spin_lock(&ocb->ocb_read_lock);
 
-       if (!oal_is_empty(oal) || oal->oal_is_closed)
+       if (!oal_is_empty(ocb) || oal->oal_is_closed)
                mask |= POLLIN;
 
-       spin_unlock(&oal->oal_read_lock);
+       spin_unlock(&ocb->ocb_read_lock);
 
        return mask;
 }
 
-static long oal_ioctl_info(struct ofd_access_log *oal, unsigned long arg)
+static long oal_ioctl_info(struct oal_circ_buf *ocb, unsigned long arg)
 {
+       struct ofd_access_log *oal = ocb->ocb_access_log;
+
        struct lustre_access_log_info_v1 __user *lali;
-       u32 entry_count = CIRC_CNT(oal->oal_circ.head,
-                               oal->oal_circ.tail,
+       u32 entry_count = CIRC_CNT(ocb->ocb_circ.head,
+                               ocb->ocb_circ.tail,
                                oal->oal_log_size) / oal->oal_entry_size;
-       u32 entry_space = CIRC_SPACE(oal->oal_circ.head,
-                               oal->oal_circ.tail,
+       u32 entry_space = CIRC_SPACE(ocb->ocb_circ.head,
+                               ocb->ocb_circ.tail,
                                oal->oal_log_size) / oal->oal_entry_size;
 
        lali = (struct lustre_access_log_info_v1 __user *)arg;
@@ -334,10 +373,10 @@ static long oal_ioctl_info(struct ofd_access_log *oal, unsigned long arg)
        if (put_user(oal->oal_entry_size, &lali->lali_entry_size))
                return -EFAULT;
 
-       if (put_user(oal->oal_circ.head, &lali->_lali_head))
+       if (put_user(ocb->ocb_circ.head, &lali->_lali_head))
                return -EFAULT;
 
-       if (put_user(oal->oal_circ.tail, &lali->_lali_tail))
+       if (put_user(ocb->ocb_circ.tail, &lali->_lali_tail))
                return -EFAULT;
 
        if (put_user(entry_space, &lali->_lali_entry_space))
@@ -346,7 +385,7 @@ static long oal_ioctl_info(struct ofd_access_log *oal, unsigned long arg)
        if (put_user(entry_count, &lali->_lali_entry_count))
                return -EFAULT;
 
-       if (put_user(oal->oal_drop_count, &lali->_lali_drop_count))
+       if (put_user(ocb->ocb_drop_count, &lali->_lali_drop_count))
                return -EFAULT;
 
        if (put_user(oal->oal_is_closed, &lali->_lali_is_closed))
@@ -358,21 +397,40 @@ static long oal_ioctl_info(struct ofd_access_log *oal, unsigned long arg)
 static long oal_file_ioctl(struct file *filp, unsigned int cmd,
                        unsigned long arg)
 {
-       struct ofd_access_log *oal = filp->private_data;
+       struct oal_circ_buf *ocb = filp->private_data;
 
        switch (cmd) {
        case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
                return LUSTRE_ACCESS_LOG_VERSION_1;
        case LUSTRE_ACCESS_LOG_IOCTL_INFO:
-               return oal_ioctl_info(oal, arg);
+               return oal_ioctl_info(ocb, arg);
+       case LUSTRE_ACCESS_LOG_IOCTL_FILTER:
+               ocb->ocb_filter = arg;
+               return 0;
        default:
                return -ENOTTY;
        }
 }
 
+static int oal_file_release(struct inode *inode, struct file *filp)
+{
+       struct oal_circ_buf *ocb = filp->private_data;
+       struct ofd_access_log *oal = ocb->ocb_access_log;
+
+       down_write(&oal->oal_buf_list_sem);
+       list_del(&ocb->ocb_list);
+       up_write(&oal->oal_buf_list_sem);
+
+       vfree(ocb->ocb_circ.buf);
+       kfree(ocb);
+
+       return 0;
+}
+
 static const struct file_operations oal_fops = {
        .owner = THIS_MODULE,
        .open = &oal_file_open,
+       .release = &oal_file_release,
        .unlocked_ioctl = &oal_file_ioctl,
        .read = &oal_file_read,
        .write = &oal_file_write,
@@ -385,7 +443,7 @@ static void oal_device_release(struct device *dev)
        struct ofd_access_log *oal = dev_get_drvdata(dev);
 
        oal_log_minor_free(MINOR(oal->oal_device.devt));
-       vfree(oal->oal_circ.buf);
+       BUG_ON(!list_empty(&oal->oal_circ_buf_list));
        kfree(oal);
 }
 
@@ -413,15 +471,8 @@ struct ofd_access_log *ofd_access_log_create(const char *ofd_name, size_t size)
        strlcpy(oal->oal_name, ofd_name, sizeof(oal->oal_name));
        oal->oal_log_size = size;
        oal->oal_entry_size = entry_size;
-       spin_lock_init(&oal->oal_write_lock);
-       spin_lock_init(&oal->oal_read_lock);
-       init_waitqueue_head(&oal->oal_read_wait_queue);
-
-       oal->oal_circ.buf = vmalloc(oal->oal_log_size);
-       if (!oal->oal_circ.buf) {
-               rc = -ENOMEM;
-               goto out_free;
-       }
+       INIT_LIST_HEAD(&oal->oal_circ_buf_list);
+       init_rwsem(&oal->oal_buf_list_sem);
 
        rc = oal_log_minor_alloc(&minor);
        if (rc < 0)
@@ -452,13 +503,13 @@ out_device_name:
 out_minor:
        oal_log_minor_free(minor);
 out_free:
-       vfree(oal->oal_circ.buf);
        kfree(oal);
 
        return ERR_PTR(rc);
 }
 
-void ofd_access(struct ofd_device *m,
+void ofd_access(const struct lu_env *env,
+               struct ofd_device *m,
                const struct lu_fid *parent_fid,
                __u64 begin, __u64 end,
                unsigned int size,
@@ -466,8 +517,9 @@ void ofd_access(struct ofd_device *m,
                int rw)
 {
        unsigned int flags = (rw == READ) ? OFD_ACCESS_READ : OFD_ACCESS_WRITE;
+       struct ofd_access_log *oal = m->ofd_access_log;
 
-       if (m->ofd_access_log && (flags & m->ofd_access_log_mask)) {
+       if (oal && (flags & m->ofd_access_log_mask)) {
                struct ofd_access_entry_v1 oae = {
                        .oae_parent_fid = *parent_fid,
                        .oae_begin = begin,
@@ -477,8 +529,26 @@ void ofd_access(struct ofd_device *m,
                        .oae_segment_count = segment_count,
                        .oae_flags = flags,
                };
-
-               oal_write_entry(m->ofd_access_log, &oae, sizeof(oae));
+               struct oal_circ_buf *ocb;
+               struct lu_seq_range range;
+               int rc;
+
+               /* learn target MDT from FID's sequence */
+               range.lsr_flags = LU_SEQ_RANGE_ANY;
+               rc = fld_server_lookup(env, m->ofd_seq_site.ss_server_fld,
+                                      fid_seq(parent_fid), &range);
+               if (unlikely(rc))
+                       CERROR("%s: can't resolve "DFID": rc=%d\n",
+                              ofd_name(m), PFID(parent_fid), rc);
+
+               down_read(&oal->oal_buf_list_sem);
+               list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
+                       /* filter by MDT index if requested */
+                       if (ocb->ocb_filter == 0xffffffff ||
+                           range.lsr_index == ocb->ocb_filter)
+                               oal_write_entry(ocb, &oae, sizeof(oae));
+               }
+               up_read(&oal->oal_buf_list_sem);
        }
 }
 
@@ -492,11 +562,17 @@ void ofd_access(struct ofd_device *m,
  * The oal will be freed when the last open file handle is closed. */
 void ofd_access_log_delete(struct ofd_access_log *oal)
 {
+       struct oal_circ_buf *ocb;
+
        if (!oal)
                return;
 
        oal->oal_is_closed = 1;
-       wake_up_all(&oal->oal_read_wait_queue);
+       down_read(&oal->oal_buf_list_sem);
+       list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
+               wake_up_all(&ocb->ocb_read_wait_queue);
+       }
+       up_read(&oal->oal_buf_list_sem);
        cdev_device_del(&oal->oal_cdev, &oal->oal_device);
 }
 
index 8d07cda..d7ac1da 100644 (file)
@@ -304,7 +304,7 @@ void ofd_access_log_module_exit(void);
 struct ofd_access_log;
 struct ofd_access_log *ofd_access_log_create(const char *ofd_name, size_t size);
 void ofd_access_log_delete(struct ofd_access_log *oal);
-void ofd_access(struct ofd_device *m,
+void ofd_access(const struct lu_env *env, struct ofd_device *m,
                const struct lu_fid *parent_fid, __u64 begin, __u64 end,
                unsigned int size, unsigned int segment_count, int rw);
 
index 5e7801f..dc5230a 100644 (file)
@@ -628,7 +628,7 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
        if (unlikely(rc))
                GOTO(buf_put, rc);
 
-       ofd_access(ofd,
+       ofd_access(env, ofd,
                &(struct lu_fid) {
                        .f_seq = oa->o_parent_seq,
                        .f_oid = oa->o_parent_oid,
@@ -831,7 +831,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
 
        ofd_read_unlock(env, fo);
 
-       ofd_access(ofd,
+       ofd_access(env, ofd,
                &(struct lu_fid) {
                        .f_seq = oa->o_parent_seq,
                        .f_oid = oa->o_parent_oid,
index 0fdeb78..91e86e0 100755 (executable)
@@ -15553,8 +15553,8 @@ test_165a() {
        (( $OST1_VERSION >= $(version_code 2.13.54) )) ||
                skip "OFD access log unsupported"
 
-       do_facet ost1 ofd_access_log_reader --debug=- --trace=- > "${trace}" &
        setup_165
+       do_facet ost1 ofd_access_log_reader --debug=- --trace=- > "${trace}" &
        sleep 5
 
        do_facet ost1 ofd_access_log_reader --list
@@ -15590,13 +15590,15 @@ test_165b() {
                skip "OFD access log unsupported"
 
        setup_165
+       do_facet ost1 ofd_access_log_reader --debug=- --trace=- > "${trace}" &
+       sleep 5
+
+       do_facet ost1 ofd_access_log_reader --list
 
        lfs setstripe -c 1 -i 0 "${file}"
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_WRONLY:w1048576c ||
                error "cannot create '${file}'"
-       do_facet ost1 ofd_access_log_reader --list
 
-       do_facet ost1 ofd_access_log_reader --debug=- --trace=- > "${trace}" &
        sleep 5
        do_facet ost1 killall -TERM ofd_access_log_reader
        wait
@@ -15632,9 +15634,12 @@ test_165b() {
        fi
 
        do_facet ost1 ofd_access_log_reader --debug=- --trace=- > "${trace}" &
+       sleep 5
+
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_RDONLY:r524288c ||
                error "cannot read '${file}'"
        sleep 5
+
        do_facet ost1 killall -TERM ofd_access_log_reader
        wait
        rc=$?
@@ -15666,6 +15671,7 @@ test_165b() {
 run_test 165b "ofd access log entries are produced and consumed"
 
 test_165c() {
+       local trace="/tmp/${tfile}.trace"
        local file="${DIR}/${tdir}/${tfile}"
 
        (( $OST1_VERSION >= $(version_code 2.13.54) )) ||
@@ -15674,6 +15680,8 @@ test_165c() {
        test_mkdir "${DIR}/${tdir}"
 
        setup_165
+       do_facet ost1 ofd_access_log_reader --debug=- --trace=- > "${trace}" &
+       sleep 5
 
        lfs setstripe -c 1 -i 0 "${DIR}/${tdir}"
 
@@ -15684,34 +15692,53 @@ test_165c() {
        done
 
        sync
-       do_facet ost1 ofd_access_log_reader --list
+
+       do_facet ost1 killall -TERM ofd_access_log_reader
+       wait
+       rc=$?
+       if ((rc != 0)); then
+               error "ofd_access_log_reader exited with rc = '${rc}'"
+       fi
+
        unlinkmany  "${file}-%d" 128
 }
 run_test 165c "full ofd access logs do not block IOs"
 
-oal_peek_entry_count() {
-       do_facet ost1 ofd_access_log_reader --list |
-               awk '$1 == "_entry_count:" { print $2; }'
+oal_get_read_count() {
+       local stats="$1"
+
+       # STATS lustre-OST0001 alr_read_count 1
+
+       do_facet ost1 cat "${stats}" |
+       awk '$1 == "STATS" && $3 == "alr_read_count" { count = $4; }
+            END { print count; }'
 }
 
-oal_expect_entry_count() {
-       local entry_count=$(oal_peek_entry_count)
-       local expect="$1"
+oal_expect_read_count() {
+       local stats="$1"
+       local count
+       local expect="$2"
+
+       # Ask ofd_access_log_reader to write stats.
+       do_facet ost1 killall -USR1 ofd_access_log_reader
+
+       # Allow some time for things to happen.
+       sleep 1
 
-       if ((entry_count == expect)); then
+       count=$(oal_get_read_count "${stats}")
+       if ((count == expect)); then
                return 0
        fi
 
-       error_noexit "bad entry count, got ${entry_count}, expected ${expect}"
-       do_facet ost1 ofd_access_log_reader --list >&2
+       error_noexit "bad read count, got ${count}, expected ${expect}"
+       do_facet ost1 cat "${stats}" >&2
        exit 1
 }
 
 test_165d() {
-       local trace="/tmp/${tfile}.trace"
+       local stats="/tmp/${tfile}.stats"
        local file="${DIR}/${tdir}/${tfile}"
        local param="obdfilter.${FSNAME}-OST0000.access_log_mask"
-       local entry_count
 
        (( $OST1_VERSION >= $(version_code 2.13.54) )) ||
                skip "OFD access log unsupported"
@@ -15719,46 +15746,95 @@ test_165d() {
        test_mkdir "${DIR}/${tdir}"
 
        setup_165
+       do_facet ost1 ofd_access_log_reader --stats="${stats}" &
+       sleep 5
+
        lfs setstripe -c 1 -i 0 "${file}"
 
        do_facet ost1 lctl set_param "${param}=rw"
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_WRONLY:w1048576c ||
                error "cannot create '${file}'"
-       oal_expect_entry_count 1
+       oal_expect_read_count "${stats}" 1
 
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_RDONLY:r1048576c ||
                error "cannot read '${file}'"
-       oal_expect_entry_count 2
+       oal_expect_read_count "${stats}" 2
 
        do_facet ost1 lctl set_param "${param}=r"
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_WRONLY:w1048576c ||
                error "cannot create '${file}'"
-       oal_expect_entry_count 2
+       oal_expect_read_count "${stats}" 2
 
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_RDONLY:r1048576c ||
                error "cannot read '${file}'"
-       oal_expect_entry_count 3
+       oal_expect_read_count "${stats}" 3
 
        do_facet ost1 lctl set_param "${param}=w"
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_WRONLY:w1048576c ||
                error "cannot create '${file}'"
-       oal_expect_entry_count 4
+       oal_expect_read_count "${stats}" 4
 
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_RDONLY:r1048576c ||
                error "cannot read '${file}'"
-       oal_expect_entry_count 4
+       oal_expect_read_count "${stats}" 4
 
        do_facet ost1 lctl set_param "${param}=0"
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_WRONLY:w1048576c ||
                error "cannot create '${file}'"
-       oal_expect_entry_count 4
+       oal_expect_read_count "${stats}" 4
 
        $MULTIOP "${file}" oO_CREAT:O_DIRECT:O_RDONLY:r1048576c ||
                error "cannot read '${file}'"
-       oal_expect_entry_count 4
+       oal_expect_read_count "${stats}" 4
+
+       do_facet ost1 killall -TERM ofd_access_log_reader
+       wait
+       rc=$?
+       if ((rc != 0)); then
+               error "ofd_access_log_reader exited with rc = '${rc}'"
+       fi
 }
 run_test 165d "ofd_access_log mask works"
 
+test_165e() {
+       local stats="/tmp/${tfile}.stats"
+       local file0="${DIR}/${tdir}-0/${tfile}"
+       local file1="${DIR}/${tdir}-1/${tfile}"
+
+       (( $OST1_VERSION >= $(version_code 2.13.54) )) ||
+               skip "OFD access log unsupported"
+
+       [[ $MDSCOUNT -lt 2 ]] && skip_env "needs >= 2 MDTs"
+
+       test_mkdir -c 1 -i 0 "${DIR}/${tdir}-0"
+       test_mkdir -c 1 -i 1 "${DIR}/${tdir}-1"
+
+       lfs setstripe -c 1 -i 0 "${file0}"
+       lfs setstripe -c 1 -i 0 "${file1}"
+
+       setup_165
+       do_facet ost1 ofd_access_log_reader -I 1 --stats="${stats}" &
+       sleep 5
+
+       $MULTIOP "${file0}" oO_CREAT:O_WRONLY:w512c ||
+               error "cannot create '${file0}'"
+       sync
+       oal_expect_read_count "${stats}" 0
+
+       $MULTIOP "${file1}" oO_CREAT:O_WRONLY:w512c ||
+               error "cannot create '${file1}'"
+       sync
+       oal_expect_read_count "${stats}" 1
+
+       do_facet ost1 killall -TERM ofd_access_log_reader
+       wait
+       rc=$?
+       if ((rc != 0)); then
+               error "ofd_access_log_reader exited with rc = '${rc}'"
+       fi
+}
+run_test 165e "ofd_access_log MDT index filter works"
+
 test_169() {
        # do directio so as not to populate the page cache
        log "creating a 10 Mb file"
index 44986f6..fd88b9a 100644 (file)
@@ -119,17 +119,20 @@ struct alr_log {
        char *alr_buf;
        size_t alr_buf_size;
        size_t alr_entry_size;
+       size_t alr_read_count;
        dev_t alr_rdev;
 };
 
 static struct alr_log *alr_log[1 << 20]; /* 20 == MINORBITS */
 static int oal_version; /* FIXME ... major version, minor version */
+static __u32 alr_filter = 0xffffffff; /* no filter by default */
 static unsigned int oal_log_major;
 static unsigned int oal_log_minor_max;
 static struct alr_batch *alr_batch;
 static FILE *alr_batch_file;
 static pthread_mutex_t alr_batch_file_mutex = PTHREAD_MUTEX_INITIALIZER;
 static const char *alr_batch_file_path;
+static const char *alr_stats_file_path;
 static int alr_print_fraction = 100;
 
 #define D_ALR_DEV "%s %d"
@@ -214,6 +217,8 @@ static int alr_log_io(int epoll_fd, struct alr_dev *ad, unsigned int mask)
 
        DEBUG("read "D_ALR_LOG", count = %zd\n", P_ALR_LOG(al), count);
 
+       al->alr_read_count += count / al->alr_entry_size;
+
        for (i = 0; i < count; i += al->alr_entry_size) {
                struct ofd_access_entry_v1 *oae =
                        (struct ofd_access_entry_v1 *)&al->alr_buf[i];
@@ -305,6 +310,12 @@ static int alr_log_add(int epoll_fd, const char *path)
                rc = 0;
                goto out;
        }
+       rc = ioctl(fd, LUSTRE_ACCESS_LOG_IOCTL_FILTER, alr_filter);
+       if (rc < 0) {
+               ERROR("cannot set filter '%s': %s\n",
+                       path, strerror(errno));
+               goto out;
+       }
 
        al = calloc(1, sizeof(*al));
        if (al == NULL)
@@ -373,6 +384,94 @@ out:
        return rc;
 }
 
+/* Call LUSTRE_ACCESS_LOG_IOCTL_INFO to get access log info and print
+ * YAML formatted info to stdout. */
+static int alr_log_info(struct alr_log *al)
+{
+       struct lustre_access_log_info_v1 lali;
+       int rc;
+
+       rc = ioctl(al->alr_dev.alr_fd, LUSTRE_ACCESS_LOG_IOCTL_INFO, &lali);
+       if (rc < 0) {
+               ERROR("cannot get info for device '%s': %s\n",
+                       al->alr_dev.alr_name, strerror(errno));
+               return -1;
+       }
+
+       printf("- name: %s\n"
+              "  version: %#x\n"
+              "  type: %#x\n"
+              "  log_size: %u\n"
+              "  entry_size: %u\n",
+              lali.lali_name,
+              lali.lali_version,
+              lali.lali_type,
+              lali.lali_log_size,
+              lali.lali_entry_size);
+
+       return 0;
+}
+
+static int alr_log_stats(FILE *file, struct alr_log *al)
+{
+       struct lustre_access_log_info_v1 lali;
+       int rc;
+
+       rc = ioctl(al->alr_dev.alr_fd, LUSTRE_ACCESS_LOG_IOCTL_INFO, &lali);
+       if (rc < 0) {
+               ERROR("cannot get info for device '%s': %s\n",
+                       al->alr_dev.alr_name, strerror(errno));
+               return -1;
+       }
+
+#define X(m) \
+       fprintf(file, "STATS %s %s %u\n", lali.lali_name, #m, lali.m)
+
+       X(_lali_head);
+       X(_lali_tail);
+       X(_lali_entry_space);
+       X(_lali_entry_count);
+       X(_lali_drop_count);
+       X(_lali_is_closed);
+#undef X
+
+       fprintf(file, "STATS %s %s %zu\n",
+               lali.lali_name, "alr_read_count", al->alr_read_count);
+
+       return 0;
+}
+
+static void alr_log_stats_all(void)
+{
+       FILE *stats_file;
+       int m;
+
+       if (alr_stats_file_path == NULL) {
+               stats_file = stderr;
+       } else if (strcmp(alr_stats_file_path, "-") == 0) {
+               stats_file = stdout;
+       } else {
+               stats_file = fopen(alr_stats_file_path, "a");
+               if (stats_file == NULL) {
+                       ERROR("cannot open '%s': %s\n",
+                             alr_stats_file_path, strerror(errno));
+                       return;
+               }
+       }
+
+       for (m = 0; m <= oal_log_minor_max; m++) {
+               if (alr_log[m] == NULL)
+                       continue;
+
+               alr_log_stats(stats_file, alr_log[m]);
+       }
+
+       if (stats_file == stdout || stats_file == stderr)
+               fflush(stats_file);
+       else
+               fclose(stats_file);
+}
+
 /* Scan /dev/lustre-access-log/ for new access log devices and add to
  * epoll set. */
 static int alr_scan(int epoll_fd)
@@ -478,6 +577,18 @@ static int alr_signal_io(int epoll_fd, struct alr_dev *sd, unsigned int mask)
        case SIGINT:
        case SIGTERM:
                return ALR_EXIT_SUCCESS;
+       case SIGUSR1:
+               alr_log_stats_all();
+
+               return ALR_OK;
+       case SIGUSR2:
+               if (debug_file == NULL)
+                       debug_file = stderr;
+
+               if (trace_file == NULL)
+                       trace_file = stderr;
+
+               return ALR_OK;
        default:
                return ALR_OK;
        }
@@ -514,46 +625,6 @@ out:
        return (rc < 0) ? ALR_EXIT_FAILURE : ALR_OK;
 }
 
-/* Call LUSTRE_ACCESS_LOG_IOCTL_INFO to get access log info and print
- * YAML formatted info to stdout. */
-static int alr_log_info(struct alr_log *al)
-{
-       struct lustre_access_log_info_v1 lali;
-       int rc;
-
-       rc = ioctl(al->alr_dev.alr_fd, LUSTRE_ACCESS_LOG_IOCTL_INFO, &lali);
-       if (rc < 0) {
-               ERROR("cannot get info for device '%s': %s\n",
-                       al->alr_dev.alr_name, strerror(errno));
-               return -1;
-       }
-
-       printf("- name: %s\n"
-              "  version: %#x\n"
-              "  type: %#x\n"
-              "  log_size: %u\n"
-              "  entry_size: %u\n"
-              "  _head: %u\n"
-              "  _tail: %u\n"
-              "  _entry_space: %u\n"
-              "  _entry_count: %u\n"
-              "  _drop_count: %u\n"
-              "  _is_closed: %u\n",
-              lali.lali_name,
-              lali.lali_version,
-              lali.lali_type,
-              lali.lali_log_size,
-              lali.lali_entry_size,
-              lali._lali_head,
-              lali._lali_tail,
-              lali._lali_entry_space,
-              lali._lali_entry_count,
-              lali._lali_drop_count,
-              lali._lali_is_closed);
-
-       return 0;
-}
-
 static struct alr_dev *alr_dev_create(int epoll_fd, int fd, const char *name,
                        int (*io)(int, struct alr_dev *, unsigned int),
                        void (*destroy)(struct alr_dev *))
@@ -595,11 +666,14 @@ void usage(void)
 "\n"
 "Mandatory arguments to long options are mandatory for short options too.\n"
 "  -f, --batch-file=FILE          print batch to file (default stdout)\n"
+"  -F, --batch-fraction=P         set batch printing fraction to P/100\n"
 "  -i, --batch-interval=INTERVAL  print batch every INTERVAL seconds\n"
 "  -o, --batch-offset=OFFSET      print batch at OFFSET seconds\n"
-"  -d, --debug[=FILE]             print debug messages to FILE (stderr)\n"
+"  -I, --mdt-index-filter=INDEX   set log MDT index filter to INDEX\n"
 "  -h, --help                     display this help and exit\n"
 "  -l, --list                     print YAML list of available access logs\n"
+"  -d, --debug[=FILE]             print debug messages to FILE (stderr)\n"
+"  -s, --stats=FILE              print stats messages to FILE (stderr)\n"
 "  -t, --trace[=FILE]             print trace messages to FILE (stderr)\n",
                program_invocation_short_name);
 }
@@ -621,17 +695,19 @@ int main(int argc, char *argv[])
 
        static struct option options[] = {
                { .name = "batch-file", .has_arg = required_argument, .val = 'f', },
+               { .name = "batch-fraction", .has_arg = required_argument, .val = 'F', },
                { .name = "batch-interval", .has_arg = required_argument, .val = 'i', },
                { .name = "batch-offset", .has_arg = required_argument, .val = 'o', },
+               { .name = "mdt-index-filter", .has_arg = required_argument, .val = 'I' },
                { .name = "debug", .has_arg = optional_argument, .val = 'd', },
                { .name = "help", .has_arg = no_argument, .val = 'h', },
-               { .name = "fraction", .has_arg = required_argument, .val = 'F', },
                { .name = "list", .has_arg = no_argument, .val = 'l', },
+               { .name = "stats", .has_arg = required_argument, .val = 's', },
                { .name = "trace", .has_arg = optional_argument, .val = 't', },
                { .name = NULL, },
        };
 
-       while ((c = getopt_long(argc, argv, "d::f:F:hi:lt::", options, NULL)) != -1) {
+       while ((c = getopt_long(argc, argv, "d::f:F:hi:I:ls:t::", options, NULL)) != -1) {
                switch (c) {
                case 'f':
                        alr_batch_file_path = optarg;
@@ -671,9 +747,15 @@ int main(int argc, char *argv[])
                        if (alr_print_fraction < 1 || alr_print_fraction > 100)
                                FATAL("invalid batch offset '%s'\n", optarg);
                        break;
+               case 'I':
+                       alr_filter = strtoll(optarg, NULL, 0);
+                       break;
                case 'l':
                        list_info = 1;
                        break;
+               case 's':
+                       alr_stats_file_path = optarg;
+                       break;
                case 't':
                        if (optarg == NULL) {
                                trace_file = stderr;
@@ -720,6 +802,8 @@ int main(int argc, char *argv[])
        sigemptyset(&signal_mask);
        sigaddset(&signal_mask, SIGINT);
        sigaddset(&signal_mask, SIGTERM);
+       sigaddset(&signal_mask, SIGUSR1);
+       sigaddset(&signal_mask, SIGUSR2);
        rc = sigprocmask(SIG_BLOCK, &signal_mask, NULL);
        if (rc < 0)
                FATAL("cannot set process signal mask: %s\n", strerror(errno));