1 #include <linux/cdev.h>
2 #include <linux/circ_buf.h>
3 #include <linux/device.h>
6 #include <linux/kernel.h>
7 #include <linux/miscdevice.h>
8 #include <linux/module.h>
9 #include <linux/poll.h>
10 #include <linux/slab.h>
11 #include <linux/types.h>
12 #include <linux/uaccess.h>
13 #include <uapi/linux/lustre/lustre_idl.h>
14 #include <uapi/linux/lustre/lustre_access_log.h>
15 #include "ofd_internal.h"
17 /* OFD access logs: OST (OFD) RPC handlers log accesses by FID and
18 * PFID which are read from userspace through character device files
19 * (/dev/lustre-access-log/scratch-OST0000). Accesses are described by
20 * struct ofd_access_entry_v1. The char device implements read()
21 * (blocking and nonblocking) and poll(), along with an ioctl that
22 * returns diagnostic information on an oal device.
24 * A control device (/dev/lustre-access-log/control) supports an ioctl()
25 * plus poll() method to for oal discovery. See uses of
26 * oal_control_event_count and oal_control_wait_queue for details.
28 * oal log size and entry size are restricted to powers of 2 to
29 * support circ_buf methods. See Documentation/core-api/circular-buffers.rst
30 * in the linux tree for more information.
32 * The associated struct device (*oal_device) owns the oal. The
33 * release() method of oal_device frees the oal and releases its
34 * minor. This may seem slightly more complicated than necessary but
35 * it allows the OST to be unmounted while the oal still has open file
40 OAL_DEV_COUNT = 1 << MINORBITS,
43 struct ofd_access_log {
44 char oal_name[128]; /* lustre-OST0000 */
45 struct device oal_device;
47 struct rw_semaphore oal_buf_list_sem;
48 struct list_head oal_circ_buf_list;
49 unsigned int oal_is_closed;
50 unsigned int oal_log_size;
51 unsigned int oal_entry_size;
55 struct list_head ocb_list;
56 spinlock_t ocb_write_lock;
57 spinlock_t ocb_read_lock;
58 struct ofd_access_log *ocb_access_log;
60 wait_queue_head_t ocb_read_wait_queue;
61 unsigned int ocb_drop_count;
62 struct circ_buf ocb_circ;
65 static atomic_t oal_control_event_count = ATOMIC_INIT(0);
66 static DECLARE_WAIT_QUEUE_HEAD(oal_control_wait_queue);
68 static struct class *oal_log_class;
69 static unsigned int oal_log_major;
70 static DEFINE_IDR(oal_log_minor_idr); /* TODO Use ida instead. */
71 static DEFINE_SPINLOCK(oal_log_minor_lock);
73 bool ofd_access_log_size_is_valid(unsigned int size)
75 const unsigned int size_min = 2 * sizeof(struct ofd_access_entry_v1);
76 const unsigned int size_max = 1U << 30;
81 return is_power_of_2(size) && size_min <= size && size <= size_max;
84 static void oal_control_event_inc(void)
86 atomic_inc(&oal_control_event_count);
87 wake_up(&oal_control_wait_queue);
90 static int oal_log_minor_alloc(int *pminor)
92 void *OAL_LOG_MINOR_ALLOCED = (void *)-1;
95 idr_preload(GFP_KERNEL);
96 spin_lock(&oal_log_minor_lock);
97 minor = idr_alloc(&oal_log_minor_idr, OAL_LOG_MINOR_ALLOCED, 0,
98 OAL_DEV_COUNT, GFP_NOWAIT);
99 spin_unlock(&oal_log_minor_lock);
110 static void oal_log_minor_free(int minor)
112 spin_lock(&oal_log_minor_lock);
113 idr_remove(&oal_log_minor_idr, minor);
114 spin_unlock(&oal_log_minor_lock);
117 static bool oal_is_empty(struct oal_circ_buf *ocb)
119 struct ofd_access_log *oal = ocb->ocb_access_log;
121 return CIRC_CNT(ocb->ocb_circ.head,
123 oal->oal_log_size) < oal->oal_entry_size;
126 static ssize_t oal_write_entry(struct oal_circ_buf *ocb,
127 const void *entry, size_t entry_size)
129 struct ofd_access_log *oal = ocb->ocb_access_log;
130 struct circ_buf *circ = &ocb->ocb_circ;
135 if (entry_size != oal->oal_entry_size)
138 spin_lock(&ocb->ocb_write_lock);
140 tail = READ_ONCE(circ->tail);
142 /* CIRC_SPACE() return space available, 0..oal_log_size -
143 * 1. It always leaves one free char, since a completely full
144 * buffer would have head == tail, which is the same as empty. */
145 if (CIRC_SPACE(head, tail, oal->oal_log_size) < oal->oal_entry_size) {
146 ocb->ocb_drop_count++;
151 memcpy(&circ->buf[head], entry, entry_size);
154 /* Ensure the entry is stored before we update the head. */
155 smp_store_release(&circ->head,
156 (head + oal->oal_entry_size) & (oal->oal_log_size - 1));
158 wake_up(&ocb->ocb_read_wait_queue);
160 spin_unlock(&ocb->ocb_write_lock);
165 /* Read one entry from the log and return its size. Non-blocking.
166 * When the log is empty we return -EAGAIN if the OST is still mounted
169 static ssize_t oal_read_entry(struct oal_circ_buf *ocb,
170 void *entry_buf, size_t entry_buf_size)
172 struct ofd_access_log *oal = ocb->ocb_access_log;
173 struct circ_buf *circ = &ocb->ocb_circ;
178 /* XXX This method may silently truncate entries when
179 * entry_buf_size is less than oal_entry_size. But that's OK
180 * because you know what you are doing. */
181 spin_lock(&ocb->ocb_read_lock);
183 /* Memory barrier usage follows circular-buffers.txt. */
184 head = smp_load_acquire(&circ->head);
187 if (!CIRC_CNT(head, tail, oal->oal_log_size)) {
188 rc = oal->oal_is_closed ? 0 : -EAGAIN;
192 BUG_ON(CIRC_CNT(head, tail, oal->oal_log_size) < oal->oal_entry_size);
194 /* Extract one entry from the buffer. */
195 rc = min_t(size_t, oal->oal_entry_size, entry_buf_size);
196 memcpy(entry_buf, &circ->buf[tail], rc);
198 /* Memory barrier usage follows circular-buffers.txt. */
199 smp_store_release(&circ->tail,
200 (tail + oal->oal_entry_size) & (oal->oal_log_size - 1));
203 spin_unlock(&ocb->ocb_read_lock);
208 static int oal_file_open(struct inode *inode, struct file *filp)
210 struct ofd_access_log *oal;
211 struct oal_circ_buf *ocb;
213 oal = container_of(inode->i_cdev, struct ofd_access_log, oal_cdev);
215 ocb = kzalloc(sizeof(*ocb), GFP_KERNEL);
218 ocb->ocb_circ.buf = vmalloc(oal->oal_log_size);
219 if (!ocb->ocb_circ.buf) {
224 spin_lock_init(&ocb->ocb_write_lock);
225 spin_lock_init(&ocb->ocb_read_lock);
226 ocb->ocb_access_log = oal;
227 init_waitqueue_head(&ocb->ocb_read_wait_queue);
229 down_write(&oal->oal_buf_list_sem);
230 list_add(&ocb->ocb_list, &oal->oal_circ_buf_list);
231 up_write(&oal->oal_buf_list_sem);
233 filp->private_data = ocb;
235 return nonseekable_open(inode, filp);
238 /* User buffer size must be a multiple of ofd access entry size. */
239 static ssize_t oal_file_read(struct file *filp, char __user *buf, size_t count,
242 struct oal_circ_buf *ocb = filp->private_data;
243 struct ofd_access_log *oal = ocb->ocb_access_log;
251 if (count & (oal->oal_entry_size - 1))
254 entry = kzalloc(oal->oal_entry_size, GFP_KERNEL);
258 while (size < count) {
259 rc = oal_read_entry(ocb, entry, oal->oal_entry_size);
261 if (filp->f_flags & O_NONBLOCK)
264 rc = wait_event_interruptible(ocb->ocb_read_wait_queue,
265 !oal_is_empty(ocb) || oal->oal_is_closed);
268 } else if (rc <= 0) {
269 break; /* cloed or error */
271 if (copy_to_user(buf, entry, oal->oal_entry_size)) {
276 buf += oal->oal_entry_size;
277 size += oal->oal_entry_size;
283 return size ? size : rc;
286 /* Included for test purposes. User buffer size must be a multiple of
287 * ofd access entry size. */
288 static ssize_t oal_file_write(struct file *filp, const char __user *buf,
289 size_t count, loff_t *ppos)
291 struct oal_circ_buf *ocb = filp->private_data;
292 struct ofd_access_log *oal = ocb->ocb_access_log;
300 if (count & (oal->oal_entry_size - 1))
303 entry = kzalloc(oal->oal_entry_size, GFP_KERNEL);
307 while (size < count) {
308 if (copy_from_user(entry, buf, oal->oal_entry_size)) {
313 rc = oal_write_entry(ocb, entry, oal->oal_entry_size);
317 buf += oal->oal_entry_size;
318 size += oal->oal_entry_size;
323 return size > 0 ? size : rc;
326 unsigned int oal_file_poll(struct file *filp, struct poll_table_struct *wait)
328 struct oal_circ_buf *ocb = filp->private_data;
329 struct ofd_access_log *oal = ocb->ocb_access_log;
330 unsigned int mask = 0;
332 poll_wait(filp, &ocb->ocb_read_wait_queue, wait);
334 spin_lock(&ocb->ocb_read_lock);
336 if (!oal_is_empty(ocb) || oal->oal_is_closed)
339 spin_unlock(&ocb->ocb_read_lock);
344 static long oal_ioctl_info(struct oal_circ_buf *ocb, unsigned long arg)
346 struct ofd_access_log *oal = ocb->ocb_access_log;
348 struct lustre_access_log_info_v1 __user *lali;
349 u32 entry_count = CIRC_CNT(ocb->ocb_circ.head,
351 oal->oal_log_size) / oal->oal_entry_size;
352 u32 entry_space = CIRC_SPACE(ocb->ocb_circ.head,
354 oal->oal_log_size) / oal->oal_entry_size;
356 lali = (struct lustre_access_log_info_v1 __user *)arg;
357 BUILD_BUG_ON(sizeof(lali->lali_name) != sizeof(oal->oal_name));
359 if (put_user(LUSTRE_ACCESS_LOG_VERSION_1, &lali->lali_version))
362 if (put_user(LUSTRE_ACCESS_LOG_TYPE_OFD, &lali->lali_type))
365 if (copy_to_user(lali->lali_name, oal->oal_name, sizeof(oal->oal_name)))
368 if (put_user(oal->oal_log_size, &lali->lali_log_size))
371 if (put_user(oal->oal_entry_size, &lali->lali_entry_size))
374 if (put_user(ocb->ocb_circ.head, &lali->_lali_head))
377 if (put_user(ocb->ocb_circ.tail, &lali->_lali_tail))
380 if (put_user(entry_space, &lali->_lali_entry_space))
383 if (put_user(entry_count, &lali->_lali_entry_count))
386 if (put_user(ocb->ocb_drop_count, &lali->_lali_drop_count))
389 if (put_user(oal->oal_is_closed, &lali->_lali_is_closed))
395 static long oal_file_ioctl(struct file *filp, unsigned int cmd,
398 struct oal_circ_buf *ocb = filp->private_data;
401 case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
402 return LUSTRE_ACCESS_LOG_VERSION_1;
403 case LUSTRE_ACCESS_LOG_IOCTL_INFO:
404 return oal_ioctl_info(ocb, arg);
405 case LUSTRE_ACCESS_LOG_IOCTL_FILTER:
406 ocb->ocb_filter = arg;
413 static int oal_file_release(struct inode *inode, struct file *filp)
415 struct oal_circ_buf *ocb = filp->private_data;
416 struct ofd_access_log *oal = ocb->ocb_access_log;
418 down_write(&oal->oal_buf_list_sem);
419 list_del(&ocb->ocb_list);
420 up_write(&oal->oal_buf_list_sem);
422 vfree(ocb->ocb_circ.buf);
428 static const struct file_operations oal_fops = {
429 .owner = THIS_MODULE,
430 .open = &oal_file_open,
431 .release = &oal_file_release,
432 .unlocked_ioctl = &oal_file_ioctl,
433 .read = &oal_file_read,
434 .write = &oal_file_write,
435 .poll = &oal_file_poll,
436 .llseek = &no_llseek,
439 static void oal_device_release(struct device *dev)
441 struct ofd_access_log *oal = dev_get_drvdata(dev);
443 oal_log_minor_free(MINOR(oal->oal_device.devt));
444 BUG_ON(!list_empty(&oal->oal_circ_buf_list));
448 struct ofd_access_log *ofd_access_log_create(const char *ofd_name, size_t size)
450 const size_t entry_size = sizeof(struct ofd_access_entry_v1);
451 struct ofd_access_log *oal;
455 BUILD_BUG_ON(sizeof(oal->oal_name) != MAX_OBD_NAME);
456 BUILD_BUG_ON(!is_power_of_2(entry_size));
461 if (!is_power_of_2(size) || (size & (entry_size - 1)) ||
462 (unsigned int)size != size)
463 return ERR_PTR(-EINVAL);
465 oal = kzalloc(sizeof(*oal), GFP_KERNEL);
467 return ERR_PTR(-ENOMEM);
469 strlcpy(oal->oal_name, ofd_name, sizeof(oal->oal_name));
470 oal->oal_log_size = size;
471 oal->oal_entry_size = entry_size;
472 INIT_LIST_HEAD(&oal->oal_circ_buf_list);
473 init_rwsem(&oal->oal_buf_list_sem);
475 rc = oal_log_minor_alloc(&minor);
479 device_initialize(&oal->oal_device);
480 oal->oal_device.devt = MKDEV(oal_log_major, minor);
481 oal->oal_device.class = oal_log_class;
482 oal->oal_device.release = &oal_device_release;
483 dev_set_drvdata(&oal->oal_device, oal);
484 rc = dev_set_name(&oal->oal_device,
485 "%s!%s", LUSTRE_ACCESS_LOG_DIR_NAME, oal->oal_name);
489 cdev_init(&oal->oal_cdev, &oal_fops);
490 oal->oal_cdev.owner = THIS_MODULE;
491 rc = cdev_device_add(&oal->oal_cdev, &oal->oal_device);
493 goto out_device_name;
495 oal_control_event_inc();
500 kfree_const(oal->oal_device.kobj.name);
502 oal_log_minor_free(minor);
509 void ofd_access(const struct lu_env *env,
510 struct ofd_device *m,
511 const struct lu_fid *parent_fid,
512 __u64 begin, __u64 end,
514 unsigned int segment_count,
517 unsigned int flags = (rw == READ) ? OFD_ACCESS_READ : OFD_ACCESS_WRITE;
518 struct ofd_access_log *oal = m->ofd_access_log;
520 /* obdfilter-survey does not set parent FIDs. */
521 if (fid_is_zero(parent_fid))
524 if (oal && (flags & m->ofd_access_log_mask)) {
525 struct ofd_access_entry_v1 oae = {
526 .oae_parent_fid = *parent_fid,
529 .oae_time = ktime_get_real_seconds(),
531 .oae_segment_count = segment_count,
534 struct lu_seq_range range = {
535 .lsr_flags = LU_SEQ_RANGE_ANY,
537 struct oal_circ_buf *ocb;
540 /* learn target MDT from FID's sequence */
541 rc = fld_server_lookup(env, m->ofd_seq_site.ss_server_fld,
542 fid_seq(parent_fid), &range);
544 CERROR("%s: can't resolve "DFID": rc=%d\n",
545 ofd_name(m), PFID(parent_fid), rc);
547 down_read(&oal->oal_buf_list_sem);
548 list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
549 /* filter by MDT index if requested */
550 if (ocb->ocb_filter == 0xffffffff ||
551 range.lsr_index == ocb->ocb_filter)
552 oal_write_entry(ocb, &oae, sizeof(oae));
554 up_read(&oal->oal_buf_list_sem);
558 /* Called on OST umount to:
559 * - Close the write end of the oal. The wakes any tasks sleeping in
560 * read or poll and makes all reads return zero once the log
562 * - Delete the associated stuct device and cdev, preventing new
563 * opens. Existing opens retain a reference on the oal through
564 * their reference on oal_device.
565 * The oal will be freed when the last open file handle is closed. */
566 void ofd_access_log_delete(struct ofd_access_log *oal)
568 struct oal_circ_buf *ocb;
573 oal->oal_is_closed = 1;
574 down_read(&oal->oal_buf_list_sem);
575 list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list)
576 wake_up(&ocb->ocb_read_wait_queue);
577 up_read(&oal->oal_buf_list_sem);
578 cdev_device_del(&oal->oal_cdev, &oal->oal_device);
581 /* private_data for control device file. */
582 struct oal_control_file {
586 /* Control file usage:
587 * Open /dev/lustre-access-log/control.
589 * Poll for readable on control FD.
590 * Call ioctl(FD, LUSTRE_ACCESS_LOG_IOCTL_PRESCAN) to fetch event count.
591 * Scan /dev/ or /sys/class/... for new devices.
593 static int oal_control_file_open(struct inode *inode, struct file *filp)
595 struct oal_control_file *ccf;
598 rc = nonseekable_open(inode, filp);
602 /* ccf->ccf_event_count = 0 on open */
603 ccf = kzalloc(sizeof(*ccf), GFP_KERNEL);
607 filp->private_data = ccf;
612 static int oal_control_file_release(struct inode *inode, struct file *filp)
614 kfree(filp->private_data);
618 static unsigned int oal_control_file_poll(struct file *filp, poll_table *wait)
620 struct oal_control_file *ccf = filp->private_data;
621 unsigned int mask = 0;
623 poll_wait(filp, &oal_control_wait_queue, wait);
625 if (atomic_read(&oal_control_event_count) != ccf->ccf_event_count)
631 static long oal_control_file_ioctl(struct file *filp, unsigned int cmd,
634 struct oal_control_file *ccf = filp->private_data;
637 case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
638 return LUSTRE_ACCESS_LOG_VERSION_1;
639 case LUSTRE_ACCESS_LOG_IOCTL_MAJOR:
640 return oal_log_major;
641 case LUSTRE_ACCESS_LOG_IOCTL_PRESCAN:
642 ccf->ccf_event_count = atomic_read(&oal_control_event_count);
649 static const struct file_operations oal_control_fops = {
650 .owner = THIS_MODULE,
651 .open = &oal_control_file_open,
652 .release = &oal_control_file_release,
653 .poll = &oal_control_file_poll,
654 .unlocked_ioctl = &oal_control_file_ioctl,
655 .llseek = &noop_llseek,
658 static struct miscdevice oal_control_misc = {
659 .minor = MISC_DYNAMIC_MINOR,
660 .name = LUSTRE_ACCESS_LOG_DIR_NAME"!control",
661 .fops = &oal_control_fops,
664 int ofd_access_log_module_init(void)
669 BUILD_BUG_ON(!is_power_of_2(sizeof(struct ofd_access_entry_v1)));
671 rc = misc_register(&oal_control_misc);
675 rc = alloc_chrdev_region(&dev, 0, OAL_DEV_COUNT,
676 LUSTRE_ACCESS_LOG_DIR_NAME);
678 goto out_oal_control_misc;
680 oal_log_major = MAJOR(dev);
682 oal_log_class = class_create(THIS_MODULE, LUSTRE_ACCESS_LOG_DIR_NAME);
683 if (IS_ERR(oal_log_class)) {
684 rc = PTR_ERR(oal_log_class);
690 unregister_chrdev_region(dev, OAL_DEV_COUNT);
691 out_oal_control_misc:
692 misc_deregister(&oal_control_misc);
697 void ofd_access_log_module_exit(void)
699 class_destroy(oal_log_class);
700 unregister_chrdev_region(MKDEV(oal_log_major, 0), OAL_DEV_COUNT);
701 idr_destroy(&oal_log_minor_idr);
702 misc_deregister(&oal_control_misc);