1 #include <linux/cdev.h>
2 #include <linux/circ_buf.h>
3 #include <linux/device.h>
6 #include <linux/kernel.h>
7 #include <linux/miscdevice.h>
8 #include <linux/module.h>
9 #include <linux/poll.h>
10 #include <linux/slab.h>
11 #include <linux/types.h>
12 #include <linux/uaccess.h>
13 #include <uapi/linux/lustre/lustre_idl.h>
14 #include <uapi/linux/lustre/lustre_access_log.h>
15 #include "ofd_internal.h"
17 /* OFD access logs: OST (OFD) RPC handlers log accesses by FID and
18 * PFID which are read from userspace through character device files
19 * (/dev/lustre-access-log/scratch-OST0000). Accesses are described by
20 * struct ofd_access_entry_v1. The char device implements read()
21 * (blocking and nonblocking) and poll(), along with an ioctl that
22 * returns diagnostic information on an oal device.
24 * A control device (/dev/lustre-access-log/control) supports an ioctl()
25 * plus poll() method to for oal discovery. See uses of
26 * oal_control_event_count and oal_control_wait_queue for details.
28 * oal log size and entry size are restricted to powers of 2 to
29 * support circ_buf methods. See Documentation/core-api/circular-buffers.rst
30 * in the linux tree for more information.
32 * The associated struct device (*oal_device) owns the oal. The
33 * release() method of oal_device frees the oal and releases its
34 * minor. This may seem slightly more complicated than necessary but
35 * it allows the OST to be unmounted while the oal still has open file
40 OAL_DEV_COUNT = 1 << MINORBITS,
43 struct ofd_access_log {
44 char oal_name[128]; /* lustre-OST0000 */
45 struct device oal_device;
47 struct rw_semaphore oal_buf_list_sem;
48 struct list_head oal_circ_buf_list;
49 unsigned int oal_is_closed;
50 unsigned int oal_log_size;
51 unsigned int oal_entry_size;
55 struct list_head ocb_list;
56 spinlock_t ocb_write_lock;
57 spinlock_t ocb_read_lock;
58 struct ofd_access_log *ocb_access_log;
60 wait_queue_head_t ocb_read_wait_queue;
61 unsigned int ocb_drop_count;
62 struct circ_buf ocb_circ;
65 static atomic_t oal_control_event_count = ATOMIC_INIT(0);
66 static DECLARE_WAIT_QUEUE_HEAD(oal_control_wait_queue);
68 static struct class *oal_log_class;
69 static unsigned int oal_log_major;
70 static DEFINE_IDR(oal_log_minor_idr); /* TODO Use ida instead. */
71 static DEFINE_SPINLOCK(oal_log_minor_lock);
73 bool ofd_access_log_size_is_valid(unsigned int size)
75 const unsigned int size_min = 2 * sizeof(struct ofd_access_entry_v1);
76 const unsigned int size_max = 1U << 30;
81 return is_power_of_2(size) && size_min <= size && size <= size_max;
84 static void oal_control_event_inc(void)
86 atomic_inc(&oal_control_event_count);
87 wake_up(&oal_control_wait_queue);
90 static int oal_log_minor_alloc(int *pminor)
92 void *OAL_LOG_MINOR_ALLOCED = (void *)-1;
95 idr_preload(GFP_KERNEL);
96 spin_lock(&oal_log_minor_lock);
97 minor = idr_alloc(&oal_log_minor_idr, OAL_LOG_MINOR_ALLOCED, 0,
98 OAL_DEV_COUNT, GFP_NOWAIT);
99 spin_unlock(&oal_log_minor_lock);
110 static void oal_log_minor_free(int minor)
112 spin_lock(&oal_log_minor_lock);
113 idr_remove(&oal_log_minor_idr, minor);
114 spin_unlock(&oal_log_minor_lock);
117 static bool oal_is_empty(struct oal_circ_buf *ocb)
119 struct ofd_access_log *oal = ocb->ocb_access_log;
121 return CIRC_CNT(ocb->ocb_circ.head,
123 oal->oal_log_size) < oal->oal_entry_size;
126 static ssize_t oal_write_entry(struct oal_circ_buf *ocb,
127 const void *entry, size_t entry_size)
129 struct ofd_access_log *oal = ocb->ocb_access_log;
130 struct circ_buf *circ = &ocb->ocb_circ;
135 if (entry_size != oal->oal_entry_size)
138 spin_lock(&ocb->ocb_write_lock);
140 tail = READ_ONCE(circ->tail);
142 /* CIRC_SPACE() return space available, 0..oal_log_size -
143 * 1. It always leaves one free char, since a completely full
144 * buffer would have head == tail, which is the same as empty. */
145 if (CIRC_SPACE(head, tail, oal->oal_log_size) < oal->oal_entry_size) {
146 ocb->ocb_drop_count++;
151 memcpy(&circ->buf[head], entry, entry_size);
154 /* Ensure the entry is stored before we update the head. */
155 smp_store_release(&circ->head,
156 (head + oal->oal_entry_size) & (oal->oal_log_size - 1));
158 wake_up(&ocb->ocb_read_wait_queue);
160 spin_unlock(&ocb->ocb_write_lock);
165 /* Read one entry from the log and return its size. Non-blocking.
166 * When the log is empty we return -EAGAIN if the OST is still mounted
169 static ssize_t oal_read_entry(struct oal_circ_buf *ocb,
170 void *entry_buf, size_t entry_buf_size)
172 struct ofd_access_log *oal = ocb->ocb_access_log;
173 struct circ_buf *circ = &ocb->ocb_circ;
178 /* XXX This method may silently truncate entries when
179 * entry_buf_size is less than oal_entry_size. But that's OK
180 * because you know what you are doing. */
181 spin_lock(&ocb->ocb_read_lock);
183 /* Memory barrier usage follows circular-buffers.txt. */
184 head = smp_load_acquire(&circ->head);
187 if (!CIRC_CNT(head, tail, oal->oal_log_size)) {
188 rc = oal->oal_is_closed ? 0 : -EAGAIN;
192 BUG_ON(CIRC_CNT(head, tail, oal->oal_log_size) < oal->oal_entry_size);
194 /* Extract one entry from the buffer. */
195 rc = min_t(size_t, oal->oal_entry_size, entry_buf_size);
196 memcpy(entry_buf, &circ->buf[tail], rc);
198 /* Memory barrier usage follows circular-buffers.txt. */
199 smp_store_release(&circ->tail,
200 (tail + oal->oal_entry_size) & (oal->oal_log_size - 1));
203 spin_unlock(&ocb->ocb_read_lock);
208 static int oal_file_open(struct inode *inode, struct file *filp)
210 struct ofd_access_log *oal;
211 struct oal_circ_buf *ocb;
213 oal = container_of(inode->i_cdev, struct ofd_access_log, oal_cdev);
215 ocb = kzalloc(sizeof(*ocb), GFP_KERNEL);
218 ocb->ocb_circ.buf = vmalloc(oal->oal_log_size);
219 if (!ocb->ocb_circ.buf) {
224 spin_lock_init(&ocb->ocb_write_lock);
225 spin_lock_init(&ocb->ocb_read_lock);
226 ocb->ocb_access_log = oal;
227 init_waitqueue_head(&ocb->ocb_read_wait_queue);
229 down_write(&oal->oal_buf_list_sem);
230 list_add(&ocb->ocb_list, &oal->oal_circ_buf_list);
231 up_write(&oal->oal_buf_list_sem);
233 filp->private_data = ocb;
235 return nonseekable_open(inode, filp);
238 /* User buffer size must be a multiple of ofd access entry size. */
239 static ssize_t oal_file_read(struct file *filp, char __user *buf, size_t count,
242 struct oal_circ_buf *ocb = filp->private_data;
243 struct ofd_access_log *oal = ocb->ocb_access_log;
251 if (count & (oal->oal_entry_size - 1))
254 entry = kzalloc(oal->oal_entry_size, GFP_KERNEL);
258 while (size < count) {
259 rc = oal_read_entry(ocb, entry, oal->oal_entry_size);
261 if (filp->f_flags & O_NONBLOCK)
264 rc = wait_event_interruptible(ocb->ocb_read_wait_queue,
265 !oal_is_empty(ocb) || oal->oal_is_closed);
268 } else if (rc <= 0) {
269 break; /* cloed or error */
271 if (copy_to_user(buf, entry, oal->oal_entry_size)) {
276 buf += oal->oal_entry_size;
277 size += oal->oal_entry_size;
283 return size ? size : rc;
286 /* Included for test purposes. User buffer size must be a multiple of
287 * ofd access entry size. */
288 static ssize_t oal_file_write(struct file *filp, const char __user *buf,
289 size_t count, loff_t *ppos)
291 struct oal_circ_buf *ocb = filp->private_data;
292 struct ofd_access_log *oal = ocb->ocb_access_log;
300 if (count & (oal->oal_entry_size - 1))
303 entry = kzalloc(oal->oal_entry_size, GFP_KERNEL);
307 while (size < count) {
308 if (copy_from_user(entry, buf, oal->oal_entry_size)) {
313 rc = oal_write_entry(ocb, entry, oal->oal_entry_size);
317 buf += oal->oal_entry_size;
318 size += oal->oal_entry_size;
323 return size > 0 ? size : rc;
326 static unsigned int oal_file_poll(struct file *filp,
327 struct poll_table_struct *wait)
329 struct oal_circ_buf *ocb = filp->private_data;
330 struct ofd_access_log *oal = ocb->ocb_access_log;
331 unsigned int mask = 0;
333 poll_wait(filp, &ocb->ocb_read_wait_queue, wait);
335 spin_lock(&ocb->ocb_read_lock);
337 if (!oal_is_empty(ocb) || oal->oal_is_closed)
340 spin_unlock(&ocb->ocb_read_lock);
345 static long oal_ioctl_info(struct oal_circ_buf *ocb, void __user *uarg)
347 struct ofd_access_log *oal = ocb->ocb_access_log;
348 struct lustre_access_log_info_v1 __user *lali;
349 u32 entry_count = CIRC_CNT(ocb->ocb_circ.head,
351 oal->oal_log_size) / oal->oal_entry_size;
352 u32 entry_space = CIRC_SPACE(ocb->ocb_circ.head,
354 oal->oal_log_size) / oal->oal_entry_size;
357 BUILD_BUG_ON(sizeof(lali->lali_name) != sizeof(oal->oal_name));
359 if (put_user(LUSTRE_ACCESS_LOG_VERSION_1, &lali->lali_version))
362 if (put_user(LUSTRE_ACCESS_LOG_TYPE_OFD, &lali->lali_type))
365 if (copy_to_user(lali->lali_name, oal->oal_name, sizeof(oal->oal_name)))
368 if (put_user(oal->oal_log_size, &lali->lali_log_size))
371 if (put_user(oal->oal_entry_size, &lali->lali_entry_size))
374 if (put_user(ocb->ocb_circ.head, &lali->_lali_head))
377 if (put_user(ocb->ocb_circ.tail, &lali->_lali_tail))
380 if (put_user(entry_space, &lali->_lali_entry_space))
383 if (put_user(entry_count, &lali->_lali_entry_count))
386 if (put_user(ocb->ocb_drop_count, &lali->_lali_drop_count))
389 if (put_user(oal->oal_is_closed, &lali->_lali_is_closed))
395 static long oal_file_ioctl(struct file *filp, unsigned int cmd,
398 struct oal_circ_buf *ocb = filp->private_data;
401 case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
402 return LUSTRE_ACCESS_LOG_VERSION_1;
403 case LUSTRE_ACCESS_LOG_IOCTL_INFO:
404 return oal_ioctl_info(ocb, (void __user *)arg);
405 case LUSTRE_ACCESS_LOG_IOCTL_FILTER:
406 ocb->ocb_filter = arg;
413 static int oal_file_release(struct inode *inode, struct file *filp)
415 struct oal_circ_buf *ocb = filp->private_data;
416 struct ofd_access_log *oal = ocb->ocb_access_log;
418 down_write(&oal->oal_buf_list_sem);
419 list_del(&ocb->ocb_list);
420 up_write(&oal->oal_buf_list_sem);
422 vfree(ocb->ocb_circ.buf);
428 static const struct file_operations oal_fops = {
429 .owner = THIS_MODULE,
430 .open = &oal_file_open,
431 .release = &oal_file_release,
432 .unlocked_ioctl = &oal_file_ioctl,
433 .read = &oal_file_read,
434 .write = &oal_file_write,
435 .poll = &oal_file_poll,
436 #ifdef HAVE_NO_LLSEEK
437 .llseek = &no_llseek,
441 static void oal_device_release(struct device *dev)
443 struct ofd_access_log *oal = dev_get_drvdata(dev);
445 oal_log_minor_free(MINOR(oal->oal_device.devt));
446 BUG_ON(!list_empty(&oal->oal_circ_buf_list));
450 struct ofd_access_log *ofd_access_log_create(const char *ofd_name, size_t size)
452 const size_t entry_size = sizeof(struct ofd_access_entry_v1);
453 struct ofd_access_log *oal;
457 BUILD_BUG_ON(sizeof(oal->oal_name) != MAX_OBD_NAME);
458 BUILD_BUG_ON(!is_power_of_2(entry_size));
463 if (!is_power_of_2(size) || (size & (entry_size - 1)) ||
464 (unsigned int)size != size)
465 return ERR_PTR(-EINVAL);
467 oal = kzalloc(sizeof(*oal), GFP_KERNEL);
469 return ERR_PTR(-ENOMEM);
471 strscpy(oal->oal_name, ofd_name, sizeof(oal->oal_name));
472 oal->oal_log_size = size;
473 oal->oal_entry_size = entry_size;
474 INIT_LIST_HEAD(&oal->oal_circ_buf_list);
475 init_rwsem(&oal->oal_buf_list_sem);
477 rc = oal_log_minor_alloc(&minor);
481 device_initialize(&oal->oal_device);
482 oal->oal_device.devt = MKDEV(oal_log_major, minor);
483 oal->oal_device.class = oal_log_class;
484 oal->oal_device.release = &oal_device_release;
485 dev_set_drvdata(&oal->oal_device, oal);
486 rc = dev_set_name(&oal->oal_device,
487 "%s!%s", LUSTRE_ACCESS_LOG_DIR_NAME, oal->oal_name);
491 cdev_init(&oal->oal_cdev, &oal_fops);
492 oal->oal_cdev.owner = THIS_MODULE;
493 rc = cdev_device_add(&oal->oal_cdev, &oal->oal_device);
495 goto out_device_name;
497 oal_control_event_inc();
502 kfree_const(oal->oal_device.kobj.name);
504 oal_log_minor_free(minor);
511 void ofd_access(const struct lu_env *env,
512 struct ofd_device *m,
513 const struct lu_fid *parent_fid,
514 __u64 begin, __u64 end,
516 unsigned int segment_count,
519 unsigned int flags = (rw == READ) ? OFD_ACCESS_READ : OFD_ACCESS_WRITE;
520 struct ofd_access_log *oal = m->ofd_access_log;
522 /* obdfilter-survey does not set parent FIDs. */
523 if (fid_is_zero(parent_fid))
526 if (oal && (flags & m->ofd_access_log_mask)) {
527 struct ofd_access_entry_v1 oae = {
528 .oae_parent_fid = *parent_fid,
531 .oae_time = ktime_get_real_seconds(),
533 .oae_segment_count = segment_count,
536 struct lu_seq_range range = {
537 .lsr_flags = LU_SEQ_RANGE_ANY,
539 struct oal_circ_buf *ocb;
542 /* learn target MDT from FID's sequence */
543 rc = fld_server_lookup(env, m->ofd_seq_site.ss_server_fld,
544 fid_seq(parent_fid), &range);
546 CERROR("%s: can't resolve "DFID": rc=%d\n",
547 ofd_name(m), PFID(parent_fid), rc);
549 down_read(&oal->oal_buf_list_sem);
550 list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
551 /* filter by MDT index if requested */
552 if (ocb->ocb_filter == 0xffffffff ||
553 range.lsr_index == ocb->ocb_filter)
554 oal_write_entry(ocb, &oae, sizeof(oae));
556 up_read(&oal->oal_buf_list_sem);
560 /* Called on OST umount to:
561 * - Close the write end of the oal. The wakes any tasks sleeping in
562 * read or poll and makes all reads return zero once the log
564 * - Delete the associated stuct device and cdev, preventing new
565 * opens. Existing opens retain a reference on the oal through
566 * their reference on oal_device.
567 * The oal will be freed when the last open file handle is closed. */
568 void ofd_access_log_delete(struct ofd_access_log *oal)
570 struct oal_circ_buf *ocb;
575 oal->oal_is_closed = 1;
576 down_read(&oal->oal_buf_list_sem);
577 list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list)
578 wake_up(&ocb->ocb_read_wait_queue);
579 up_read(&oal->oal_buf_list_sem);
580 cdev_device_del(&oal->oal_cdev, &oal->oal_device);
583 /* private_data for control device file. */
584 struct oal_control_file {
588 /* Control file usage:
589 * Open /dev/lustre-access-log/control.
591 * Poll for readable on control FD.
592 * Call ioctl(FD, LUSTRE_ACCESS_LOG_IOCTL_PRESCAN) to fetch event count.
593 * Scan /dev/ or /sys/class/... for new devices.
595 static int oal_control_file_open(struct inode *inode, struct file *filp)
597 struct oal_control_file *ccf;
600 rc = nonseekable_open(inode, filp);
604 /* ccf->ccf_event_count = 0 on open */
605 ccf = kzalloc(sizeof(*ccf), GFP_KERNEL);
609 filp->private_data = ccf;
614 static int oal_control_file_release(struct inode *inode, struct file *filp)
616 kfree(filp->private_data);
620 static unsigned int oal_control_file_poll(struct file *filp, poll_table *wait)
622 struct oal_control_file *ccf = filp->private_data;
623 unsigned int mask = 0;
625 poll_wait(filp, &oal_control_wait_queue, wait);
627 if (atomic_read(&oal_control_event_count) != ccf->ccf_event_count)
633 static long oal_control_file_ioctl(struct file *filp, unsigned int cmd,
636 struct oal_control_file *ccf = filp->private_data;
639 case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
640 return LUSTRE_ACCESS_LOG_VERSION_1;
641 case LUSTRE_ACCESS_LOG_IOCTL_MAJOR:
642 return oal_log_major;
643 case LUSTRE_ACCESS_LOG_IOCTL_PRESCAN:
644 ccf->ccf_event_count = atomic_read(&oal_control_event_count);
651 static const struct file_operations oal_control_fops = {
652 .owner = THIS_MODULE,
653 .open = &oal_control_file_open,
654 .release = &oal_control_file_release,
655 .poll = &oal_control_file_poll,
656 .unlocked_ioctl = &oal_control_file_ioctl,
657 .llseek = &noop_llseek,
660 static struct miscdevice oal_control_misc = {
661 .minor = MISC_DYNAMIC_MINOR,
662 .name = LUSTRE_ACCESS_LOG_DIR_NAME"!control",
663 .fops = &oal_control_fops,
666 int ofd_access_log_module_init(void)
671 BUILD_BUG_ON(!is_power_of_2(sizeof(struct ofd_access_entry_v1)));
673 rc = misc_register(&oal_control_misc);
677 rc = alloc_chrdev_region(&dev, 0, OAL_DEV_COUNT,
678 LUSTRE_ACCESS_LOG_DIR_NAME);
680 goto out_oal_control_misc;
682 oal_log_major = MAJOR(dev);
684 oal_log_class = ll_class_create(LUSTRE_ACCESS_LOG_DIR_NAME);
685 if (IS_ERR(oal_log_class)) {
686 rc = PTR_ERR(oal_log_class);
692 unregister_chrdev_region(dev, OAL_DEV_COUNT);
693 out_oal_control_misc:
694 misc_deregister(&oal_control_misc);
699 void ofd_access_log_module_exit(void)
701 class_destroy(oal_log_class);
702 unregister_chrdev_region(MKDEV(oal_log_major, 0), OAL_DEV_COUNT);
703 idr_destroy(&oal_log_minor_idr);
704 misc_deregister(&oal_control_misc);