1 // SPDX-License-Identifier: GPL-2.0
4 * This file is part of Lustre, http://www.lustre.org/
6 * OFD access logs: OST (OFD) RPC handlers log accesses by FID and
7 * PFID which are read from userspace through character device files
8 * (/dev/lustre-access-log/scratch-OST0000). Accesses are described by
9 * struct ofd_access_entry_v1. The char device implements read()
10 * (blocking and nonblocking) and poll(), along with an ioctl that
11 * returns diagnostic information on an oal device.
13 * A control device (/dev/lustre-access-log/control) supports an ioctl()
14 * plus poll() method to for oal discovery. See uses of
15 * oal_control_event_count and oal_control_wait_queue for details.
17 * oal log size and entry size are restricted to powers of 2 to
18 * support circ_buf methods. See Documentation/core-api/circular-buffers.rst
19 * in the linux tree for more information.
21 * The associated struct device (*oal_device) owns the oal. The
22 * release() method of oal_device frees the oal and releases its
23 * minor. This may seem slightly more complicated than necessary but
24 * it allows the OST to be unmounted while the oal still has open file
28 #include <linux/cdev.h>
29 #include <linux/circ_buf.h>
30 #include <linux/device.h>
32 #include <linux/idr.h>
33 #include <linux/kernel.h>
34 #include <linux/miscdevice.h>
35 #include <linux/module.h>
36 #include <linux/poll.h>
37 #include <linux/slab.h>
38 #include <linux/types.h>
39 #include <linux/uaccess.h>
40 #include <uapi/linux/lustre/lustre_idl.h>
41 #include <uapi/linux/lustre/lustre_access_log.h>
42 #include "ofd_internal.h"
45 OAL_DEV_COUNT = 1 << MINORBITS,
48 struct ofd_access_log {
49 char oal_name[128]; /* lustre-OST0000 */
50 struct device oal_device;
52 struct rw_semaphore oal_buf_list_sem;
53 struct list_head oal_circ_buf_list;
54 unsigned int oal_is_closed;
55 unsigned int oal_log_size;
56 unsigned int oal_entry_size;
60 struct list_head ocb_list;
61 spinlock_t ocb_write_lock;
62 spinlock_t ocb_read_lock;
63 struct ofd_access_log *ocb_access_log;
65 wait_queue_head_t ocb_read_wait_queue;
66 unsigned int ocb_drop_count;
67 struct circ_buf ocb_circ;
70 static atomic_t oal_control_event_count = ATOMIC_INIT(0);
71 static DECLARE_WAIT_QUEUE_HEAD(oal_control_wait_queue);
73 static struct class *oal_log_class;
74 static unsigned int oal_log_major;
75 static DEFINE_IDR(oal_log_minor_idr); /* TODO Use ida instead. */
76 static DEFINE_SPINLOCK(oal_log_minor_lock);
78 bool ofd_access_log_size_is_valid(unsigned int size)
80 const unsigned int size_min = 2 * sizeof(struct ofd_access_entry_v1);
81 const unsigned int size_max = 1U << 30;
86 return is_power_of_2(size) && size_min <= size && size <= size_max;
89 static void oal_control_event_inc(void)
91 atomic_inc(&oal_control_event_count);
92 wake_up(&oal_control_wait_queue);
95 static int oal_log_minor_alloc(int *pminor)
97 void *OAL_LOG_MINOR_ALLOCED = (void *)-1;
100 idr_preload(GFP_KERNEL);
101 spin_lock(&oal_log_minor_lock);
102 minor = idr_alloc(&oal_log_minor_idr, OAL_LOG_MINOR_ALLOCED, 0,
103 OAL_DEV_COUNT, GFP_NOWAIT);
104 spin_unlock(&oal_log_minor_lock);
115 static void oal_log_minor_free(int minor)
117 spin_lock(&oal_log_minor_lock);
118 idr_remove(&oal_log_minor_idr, minor);
119 spin_unlock(&oal_log_minor_lock);
122 static bool oal_is_empty(struct oal_circ_buf *ocb)
124 struct ofd_access_log *oal = ocb->ocb_access_log;
126 return CIRC_CNT(ocb->ocb_circ.head,
128 oal->oal_log_size) < oal->oal_entry_size;
131 static ssize_t oal_write_entry(struct oal_circ_buf *ocb,
132 const void *entry, size_t entry_size)
134 struct ofd_access_log *oal = ocb->ocb_access_log;
135 struct circ_buf *circ = &ocb->ocb_circ;
140 if (entry_size != oal->oal_entry_size)
143 spin_lock(&ocb->ocb_write_lock);
145 tail = READ_ONCE(circ->tail);
147 /* CIRC_SPACE() return space available, 0..oal_log_size -
148 * 1. It always leaves one free char, since a completely full
149 * buffer would have head == tail, which is the same as empty. */
150 if (CIRC_SPACE(head, tail, oal->oal_log_size) < oal->oal_entry_size) {
151 ocb->ocb_drop_count++;
156 memcpy(&circ->buf[head], entry, entry_size);
159 /* Ensure the entry is stored before we update the head. */
160 smp_store_release(&circ->head,
161 (head + oal->oal_entry_size) & (oal->oal_log_size - 1));
163 wake_up(&ocb->ocb_read_wait_queue);
165 spin_unlock(&ocb->ocb_write_lock);
170 /* Read one entry from the log and return its size. Non-blocking.
171 * When the log is empty we return -EAGAIN if the OST is still mounted
174 static ssize_t oal_read_entry(struct oal_circ_buf *ocb,
175 void *entry_buf, size_t entry_buf_size)
177 struct ofd_access_log *oal = ocb->ocb_access_log;
178 struct circ_buf *circ = &ocb->ocb_circ;
183 /* XXX This method may silently truncate entries when
184 * entry_buf_size is less than oal_entry_size. But that's OK
185 * because you know what you are doing. */
186 spin_lock(&ocb->ocb_read_lock);
188 /* Memory barrier usage follows circular-buffers.txt. */
189 head = smp_load_acquire(&circ->head);
192 if (!CIRC_CNT(head, tail, oal->oal_log_size)) {
193 rc = oal->oal_is_closed ? 0 : -EAGAIN;
197 BUG_ON(CIRC_CNT(head, tail, oal->oal_log_size) < oal->oal_entry_size);
199 /* Extract one entry from the buffer. */
200 rc = min_t(size_t, oal->oal_entry_size, entry_buf_size);
201 memcpy(entry_buf, &circ->buf[tail], rc);
203 /* Memory barrier usage follows circular-buffers.txt. */
204 smp_store_release(&circ->tail,
205 (tail + oal->oal_entry_size) & (oal->oal_log_size - 1));
208 spin_unlock(&ocb->ocb_read_lock);
213 static int oal_file_open(struct inode *inode, struct file *filp)
215 struct ofd_access_log *oal;
216 struct oal_circ_buf *ocb;
218 oal = container_of(inode->i_cdev, struct ofd_access_log, oal_cdev);
220 ocb = kzalloc(sizeof(*ocb), GFP_KERNEL);
223 ocb->ocb_circ.buf = vmalloc(oal->oal_log_size);
224 if (!ocb->ocb_circ.buf) {
229 spin_lock_init(&ocb->ocb_write_lock);
230 spin_lock_init(&ocb->ocb_read_lock);
231 ocb->ocb_access_log = oal;
232 init_waitqueue_head(&ocb->ocb_read_wait_queue);
234 down_write(&oal->oal_buf_list_sem);
235 list_add(&ocb->ocb_list, &oal->oal_circ_buf_list);
236 up_write(&oal->oal_buf_list_sem);
238 filp->private_data = ocb;
240 return nonseekable_open(inode, filp);
243 /* User buffer size must be a multiple of ofd access entry size. */
244 static ssize_t oal_file_read(struct file *filp, char __user *buf, size_t count,
247 struct oal_circ_buf *ocb = filp->private_data;
248 struct ofd_access_log *oal = ocb->ocb_access_log;
256 if (count & (oal->oal_entry_size - 1))
259 entry = kzalloc(oal->oal_entry_size, GFP_KERNEL);
263 while (size < count) {
264 rc = oal_read_entry(ocb, entry, oal->oal_entry_size);
266 if (filp->f_flags & O_NONBLOCK)
269 rc = wait_event_interruptible(ocb->ocb_read_wait_queue,
270 !oal_is_empty(ocb) || oal->oal_is_closed);
273 } else if (rc <= 0) {
274 break; /* cloed or error */
276 if (copy_to_user(buf, entry, oal->oal_entry_size)) {
281 buf += oal->oal_entry_size;
282 size += oal->oal_entry_size;
288 return size ? size : rc;
291 /* Included for test purposes. User buffer size must be a multiple of
292 * ofd access entry size. */
293 static ssize_t oal_file_write(struct file *filp, const char __user *buf,
294 size_t count, loff_t *ppos)
296 struct oal_circ_buf *ocb = filp->private_data;
297 struct ofd_access_log *oal = ocb->ocb_access_log;
305 if (count & (oal->oal_entry_size - 1))
308 entry = kzalloc(oal->oal_entry_size, GFP_KERNEL);
312 while (size < count) {
313 if (copy_from_user(entry, buf, oal->oal_entry_size)) {
318 rc = oal_write_entry(ocb, entry, oal->oal_entry_size);
322 buf += oal->oal_entry_size;
323 size += oal->oal_entry_size;
328 return size > 0 ? size : rc;
331 static unsigned int oal_file_poll(struct file *filp,
332 struct poll_table_struct *wait)
334 struct oal_circ_buf *ocb = filp->private_data;
335 struct ofd_access_log *oal = ocb->ocb_access_log;
336 unsigned int mask = 0;
338 poll_wait(filp, &ocb->ocb_read_wait_queue, wait);
340 spin_lock(&ocb->ocb_read_lock);
342 if (!oal_is_empty(ocb) || oal->oal_is_closed)
345 spin_unlock(&ocb->ocb_read_lock);
350 static long oal_ioctl_info(struct oal_circ_buf *ocb, void __user *uarg)
352 struct ofd_access_log *oal = ocb->ocb_access_log;
353 struct lustre_access_log_info_v1 __user *lali;
354 u32 entry_count = CIRC_CNT(ocb->ocb_circ.head,
356 oal->oal_log_size) / oal->oal_entry_size;
357 u32 entry_space = CIRC_SPACE(ocb->ocb_circ.head,
359 oal->oal_log_size) / oal->oal_entry_size;
362 BUILD_BUG_ON(sizeof(lali->lali_name) != sizeof(oal->oal_name));
364 if (put_user(LUSTRE_ACCESS_LOG_VERSION_1, &lali->lali_version))
367 if (put_user(LUSTRE_ACCESS_LOG_TYPE_OFD, &lali->lali_type))
370 if (copy_to_user(lali->lali_name, oal->oal_name, sizeof(oal->oal_name)))
373 if (put_user(oal->oal_log_size, &lali->lali_log_size))
376 if (put_user(oal->oal_entry_size, &lali->lali_entry_size))
379 if (put_user(ocb->ocb_circ.head, &lali->_lali_head))
382 if (put_user(ocb->ocb_circ.tail, &lali->_lali_tail))
385 if (put_user(entry_space, &lali->_lali_entry_space))
388 if (put_user(entry_count, &lali->_lali_entry_count))
391 if (put_user(ocb->ocb_drop_count, &lali->_lali_drop_count))
394 if (put_user(oal->oal_is_closed, &lali->_lali_is_closed))
400 static long oal_file_ioctl(struct file *filp, unsigned int cmd,
403 struct oal_circ_buf *ocb = filp->private_data;
406 case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
407 return LUSTRE_ACCESS_LOG_VERSION_1;
408 case LUSTRE_ACCESS_LOG_IOCTL_INFO:
409 return oal_ioctl_info(ocb, (void __user *)arg);
410 case LUSTRE_ACCESS_LOG_IOCTL_FILTER:
411 ocb->ocb_filter = arg;
418 static int oal_file_release(struct inode *inode, struct file *filp)
420 struct oal_circ_buf *ocb = filp->private_data;
421 struct ofd_access_log *oal = ocb->ocb_access_log;
423 down_write(&oal->oal_buf_list_sem);
424 list_del(&ocb->ocb_list);
425 up_write(&oal->oal_buf_list_sem);
427 vfree(ocb->ocb_circ.buf);
433 static const struct file_operations oal_fops = {
434 .owner = THIS_MODULE,
435 .open = &oal_file_open,
436 .release = &oal_file_release,
437 .unlocked_ioctl = &oal_file_ioctl,
438 .read = &oal_file_read,
439 .write = &oal_file_write,
440 .poll = &oal_file_poll,
441 #ifdef HAVE_NO_LLSEEK
442 .llseek = &no_llseek,
446 static void oal_device_release(struct device *dev)
448 struct ofd_access_log *oal = dev_get_drvdata(dev);
450 oal_log_minor_free(MINOR(oal->oal_device.devt));
451 BUG_ON(!list_empty(&oal->oal_circ_buf_list));
455 struct ofd_access_log *ofd_access_log_create(const char *ofd_name, size_t size)
457 const size_t entry_size = sizeof(struct ofd_access_entry_v1);
458 struct ofd_access_log *oal;
462 BUILD_BUG_ON(sizeof(oal->oal_name) != MAX_OBD_NAME);
463 BUILD_BUG_ON(!is_power_of_2(entry_size));
468 if (!is_power_of_2(size) || (size & (entry_size - 1)) ||
469 (unsigned int)size != size)
470 return ERR_PTR(-EINVAL);
472 oal = kzalloc(sizeof(*oal), GFP_KERNEL);
474 return ERR_PTR(-ENOMEM);
476 strscpy(oal->oal_name, ofd_name, sizeof(oal->oal_name));
477 oal->oal_log_size = size;
478 oal->oal_entry_size = entry_size;
479 INIT_LIST_HEAD(&oal->oal_circ_buf_list);
480 init_rwsem(&oal->oal_buf_list_sem);
482 rc = oal_log_minor_alloc(&minor);
486 device_initialize(&oal->oal_device);
487 oal->oal_device.devt = MKDEV(oal_log_major, minor);
488 oal->oal_device.class = oal_log_class;
489 oal->oal_device.release = &oal_device_release;
490 dev_set_drvdata(&oal->oal_device, oal);
491 rc = dev_set_name(&oal->oal_device,
492 "%s!%s", LUSTRE_ACCESS_LOG_DIR_NAME, oal->oal_name);
496 cdev_init(&oal->oal_cdev, &oal_fops);
497 oal->oal_cdev.owner = THIS_MODULE;
498 rc = cdev_device_add(&oal->oal_cdev, &oal->oal_device);
500 goto out_device_name;
502 oal_control_event_inc();
507 kfree_const(oal->oal_device.kobj.name);
509 oal_log_minor_free(minor);
516 void ofd_access(const struct lu_env *env,
517 struct ofd_device *m,
518 const struct lu_fid *parent_fid,
519 __u64 begin, __u64 end,
521 unsigned int segment_count,
524 unsigned int flags = (rw == READ) ? OFD_ACCESS_READ : OFD_ACCESS_WRITE;
525 struct ofd_access_log *oal = m->ofd_access_log;
527 /* obdfilter-survey does not set parent FIDs. */
528 if (fid_is_zero(parent_fid))
531 if (oal && (flags & m->ofd_access_log_mask)) {
532 struct ofd_access_entry_v1 oae = {
533 .oae_parent_fid = *parent_fid,
536 .oae_time = ktime_get_real_seconds(),
538 .oae_segment_count = segment_count,
541 struct lu_seq_range range = {
542 .lsr_flags = LU_SEQ_RANGE_ANY,
544 struct oal_circ_buf *ocb;
547 /* learn target MDT from FID's sequence */
548 rc = fld_server_lookup(env, m->ofd_seq_site.ss_server_fld,
549 fid_seq(parent_fid), &range);
551 CERROR("%s: can't resolve "DFID": rc=%d\n",
552 ofd_name(m), PFID(parent_fid), rc);
554 down_read(&oal->oal_buf_list_sem);
555 list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
556 /* filter by MDT index if requested */
557 if (ocb->ocb_filter == 0xffffffff ||
558 range.lsr_index == ocb->ocb_filter)
559 oal_write_entry(ocb, &oae, sizeof(oae));
561 up_read(&oal->oal_buf_list_sem);
565 /* Called on OST umount to:
566 * - Close the write end of the oal. The wakes any tasks sleeping in
567 * read or poll and makes all reads return zero once the log
569 * - Delete the associated stuct device and cdev, preventing new
570 * opens. Existing opens retain a reference on the oal through
571 * their reference on oal_device.
572 * The oal will be freed when the last open file handle is closed. */
573 void ofd_access_log_delete(struct ofd_access_log *oal)
575 struct oal_circ_buf *ocb;
580 oal->oal_is_closed = 1;
581 down_read(&oal->oal_buf_list_sem);
582 list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list)
583 wake_up(&ocb->ocb_read_wait_queue);
584 up_read(&oal->oal_buf_list_sem);
585 cdev_device_del(&oal->oal_cdev, &oal->oal_device);
586 put_device(&oal->oal_device);
589 /* private_data for control device file. */
590 struct oal_control_file {
594 /* Control file usage:
595 * Open /dev/lustre-access-log/control.
597 * Poll for readable on control FD.
598 * Call ioctl(FD, LUSTRE_ACCESS_LOG_IOCTL_PRESCAN) to fetch event count.
599 * Scan /dev/ or /sys/class/... for new devices.
601 static int oal_control_file_open(struct inode *inode, struct file *filp)
603 struct oal_control_file *ccf;
606 rc = nonseekable_open(inode, filp);
610 /* ccf->ccf_event_count = 0 on open */
611 ccf = kzalloc(sizeof(*ccf), GFP_KERNEL);
615 filp->private_data = ccf;
620 static int oal_control_file_release(struct inode *inode, struct file *filp)
622 kfree(filp->private_data);
626 static unsigned int oal_control_file_poll(struct file *filp, poll_table *wait)
628 struct oal_control_file *ccf = filp->private_data;
629 unsigned int mask = 0;
631 poll_wait(filp, &oal_control_wait_queue, wait);
633 if (atomic_read(&oal_control_event_count) != ccf->ccf_event_count)
639 static long oal_control_file_ioctl(struct file *filp, unsigned int cmd,
642 struct oal_control_file *ccf = filp->private_data;
645 case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
646 return LUSTRE_ACCESS_LOG_VERSION_1;
647 case LUSTRE_ACCESS_LOG_IOCTL_MAJOR:
648 return oal_log_major;
649 case LUSTRE_ACCESS_LOG_IOCTL_PRESCAN:
650 ccf->ccf_event_count = atomic_read(&oal_control_event_count);
657 static const struct file_operations oal_control_fops = {
658 .owner = THIS_MODULE,
659 .open = &oal_control_file_open,
660 .release = &oal_control_file_release,
661 .poll = &oal_control_file_poll,
662 .unlocked_ioctl = &oal_control_file_ioctl,
663 .llseek = &noop_llseek,
666 static struct miscdevice oal_control_misc = {
667 .minor = MISC_DYNAMIC_MINOR,
668 .name = LUSTRE_ACCESS_LOG_DIR_NAME"!control",
669 .fops = &oal_control_fops,
672 int ofd_access_log_module_init(void)
677 BUILD_BUG_ON(!is_power_of_2(sizeof(struct ofd_access_entry_v1)));
679 rc = misc_register(&oal_control_misc);
683 rc = alloc_chrdev_region(&dev, 0, OAL_DEV_COUNT,
684 LUSTRE_ACCESS_LOG_DIR_NAME);
686 goto out_oal_control_misc;
688 oal_log_major = MAJOR(dev);
690 oal_log_class = ll_class_create(LUSTRE_ACCESS_LOG_DIR_NAME);
691 if (IS_ERR(oal_log_class)) {
692 rc = PTR_ERR(oal_log_class);
698 unregister_chrdev_region(dev, OAL_DEV_COUNT);
699 out_oal_control_misc:
700 misc_deregister(&oal_control_misc);
705 void ofd_access_log_module_exit(void)
707 class_destroy(oal_log_class);
708 unregister_chrdev_region(MKDEV(oal_log_major, 0), OAL_DEV_COUNT);
709 idr_destroy(&oal_log_minor_idr);
710 misc_deregister(&oal_control_misc);