1 #include <linux/cdev.h>
2 #include <linux/circ_buf.h>
3 #include <linux/device.h>
5 #include <linux/kernel.h>
6 #include <linux/miscdevice.h>
7 #include <linux/module.h>
8 #include <linux/poll.h>
9 #include <linux/slab.h>
10 #include <linux/types.h>
11 #include <linux/uaccess.h>
12 #include <uapi/linux/lustre/lustre_idl.h>
13 #include <uapi/linux/lustre/lustre_access_log.h>
14 #include "ofd_internal.h"
16 /* OFD access logs: OST (OFD) RPC handlers log accesses by FID and
17 * PFID which are read from userspace through character device files
18 * (/dev/lustre-access-log/scratch-OST0000). Accesses are described by
19 * struct ofd_access_entry_v1. The char device implements read()
20 * (blocking and nonblocking) and poll(), along with an ioctl that
21 * returns diagnostic information on an oal device.
23 * A control device (/dev/lustre-access-log/control) supports an ioctl()
24 * plus poll() method to for oal discovery. See uses of
25 * oal_control_event_count and oal_control_wait_queue for details.
27 * oal log size and entry size are restricted to powers of 2 to
28 * support circ_buf methods. See Documentation/core-api/circular-buffers.rst
29 * in the linux tree for more information.
31 * The associated struct device (*oal_device) owns the oal. The
32 * release() method of oal_device frees the oal and releases its
33 * minor. This may seem slightly more complicated than necessary but
34 * it allows the OST to be unmounted while the oal still has open file
39 OAL_DEV_COUNT = 1 << MINORBITS,
42 struct ofd_access_log {
43 char oal_name[128]; /* lustre-OST0000 */
44 struct device oal_device;
46 struct rw_semaphore oal_buf_list_sem;
47 struct list_head oal_circ_buf_list;
48 unsigned int oal_is_closed;
49 unsigned int oal_log_size;
50 unsigned int oal_entry_size;
54 struct list_head ocb_list;
55 spinlock_t ocb_write_lock;
56 spinlock_t ocb_read_lock;
57 struct ofd_access_log *ocb_access_log;
59 wait_queue_head_t ocb_read_wait_queue;
60 unsigned int ocb_drop_count;
61 struct circ_buf ocb_circ;
64 static atomic_t oal_control_event_count = ATOMIC_INIT(0);
65 static DECLARE_WAIT_QUEUE_HEAD(oal_control_wait_queue);
67 static struct class *oal_log_class;
68 static unsigned int oal_log_major;
69 static DEFINE_IDR(oal_log_minor_idr); /* TODO Use ida instead. */
70 static DEFINE_SPINLOCK(oal_log_minor_lock);
72 bool ofd_access_log_size_is_valid(unsigned int size)
74 const unsigned int size_min = 2 * sizeof(struct ofd_access_entry_v1);
75 const unsigned int size_max = 1U << 30;
80 return is_power_of_2(size) && size_min <= size && size <= size_max;
83 static void oal_control_event_inc(void)
85 atomic_inc(&oal_control_event_count);
86 wake_up(&oal_control_wait_queue);
89 static int oal_log_minor_alloc(int *pminor)
91 void *OAL_LOG_MINOR_ALLOCED = (void *)-1;
94 idr_preload(GFP_KERNEL);
95 spin_lock(&oal_log_minor_lock);
96 minor = idr_alloc(&oal_log_minor_idr, OAL_LOG_MINOR_ALLOCED, 0,
97 OAL_DEV_COUNT, GFP_NOWAIT);
98 spin_unlock(&oal_log_minor_lock);
109 static void oal_log_minor_free(int minor)
111 spin_lock(&oal_log_minor_lock);
112 idr_remove(&oal_log_minor_idr, minor);
113 spin_unlock(&oal_log_minor_lock);
116 static bool oal_is_empty(struct oal_circ_buf *ocb)
118 struct ofd_access_log *oal = ocb->ocb_access_log;
120 return CIRC_CNT(ocb->ocb_circ.head,
122 oal->oal_log_size) < oal->oal_entry_size;
125 static ssize_t oal_write_entry(struct oal_circ_buf *ocb,
126 const void *entry, size_t entry_size)
128 struct ofd_access_log *oal = ocb->ocb_access_log;
129 struct circ_buf *circ = &ocb->ocb_circ;
134 if (entry_size != oal->oal_entry_size)
137 spin_lock(&ocb->ocb_write_lock);
139 tail = READ_ONCE(circ->tail);
141 /* CIRC_SPACE() return space available, 0..oal_log_size -
142 * 1. It always leaves one free char, since a completely full
143 * buffer would have head == tail, which is the same as empty. */
144 if (CIRC_SPACE(head, tail, oal->oal_log_size) < oal->oal_entry_size) {
145 ocb->ocb_drop_count++;
150 memcpy(&circ->buf[head], entry, entry_size);
153 /* Ensure the entry is stored before we update the head. */
154 smp_store_release(&circ->head,
155 (head + oal->oal_entry_size) & (oal->oal_log_size - 1));
157 wake_up(&ocb->ocb_read_wait_queue);
159 spin_unlock(&ocb->ocb_write_lock);
164 /* Read one entry from the log and return its size. Non-blocking.
165 * When the log is empty we return -EAGAIN if the OST is still mounted
168 static ssize_t oal_read_entry(struct oal_circ_buf *ocb,
169 void *entry_buf, size_t entry_buf_size)
171 struct ofd_access_log *oal = ocb->ocb_access_log;
172 struct circ_buf *circ = &ocb->ocb_circ;
177 /* XXX This method may silently truncate entries when
178 * entry_buf_size is less than oal_entry_size. But that's OK
179 * because you know what you are doing. */
180 spin_lock(&ocb->ocb_read_lock);
182 /* Memory barrier usage follows circular-buffers.txt. */
183 head = smp_load_acquire(&circ->head);
186 if (!CIRC_CNT(head, tail, oal->oal_log_size)) {
187 rc = oal->oal_is_closed ? 0 : -EAGAIN;
191 BUG_ON(CIRC_CNT(head, tail, oal->oal_log_size) < oal->oal_entry_size);
193 /* Read index before reading contents at that index. */
194 smp_read_barrier_depends();
196 /* Extract one entry from the buffer. */
197 rc = min_t(size_t, oal->oal_entry_size, entry_buf_size);
198 memcpy(entry_buf, &circ->buf[tail], rc);
200 /* Memory barrier usage follows circular-buffers.txt. */
201 smp_store_release(&circ->tail,
202 (tail + oal->oal_entry_size) & (oal->oal_log_size - 1));
205 spin_unlock(&ocb->ocb_read_lock);
210 static int oal_file_open(struct inode *inode, struct file *filp)
212 struct ofd_access_log *oal;
213 struct oal_circ_buf *ocb;
215 oal = container_of(inode->i_cdev, struct ofd_access_log, oal_cdev);
217 ocb = kzalloc(sizeof(*ocb), GFP_KERNEL);
220 ocb->ocb_circ.buf = vmalloc(oal->oal_log_size);
221 if (!ocb->ocb_circ.buf) {
226 spin_lock_init(&ocb->ocb_write_lock);
227 spin_lock_init(&ocb->ocb_read_lock);
228 ocb->ocb_access_log = oal;
229 init_waitqueue_head(&ocb->ocb_read_wait_queue);
231 down_write(&oal->oal_buf_list_sem);
232 list_add(&ocb->ocb_list, &oal->oal_circ_buf_list);
233 up_write(&oal->oal_buf_list_sem);
235 filp->private_data = ocb;
237 return nonseekable_open(inode, filp);
240 /* User buffer size must be a multiple of ofd access entry size. */
241 static ssize_t oal_file_read(struct file *filp, char __user *buf, size_t count,
244 struct oal_circ_buf *ocb = filp->private_data;
245 struct ofd_access_log *oal = ocb->ocb_access_log;
253 if (count & (oal->oal_entry_size - 1))
256 entry = kzalloc(oal->oal_entry_size, GFP_KERNEL);
260 while (size < count) {
261 rc = oal_read_entry(ocb, entry, oal->oal_entry_size);
263 if (filp->f_flags & O_NONBLOCK)
266 rc = wait_event_interruptible(ocb->ocb_read_wait_queue,
267 !oal_is_empty(ocb) || oal->oal_is_closed);
270 } else if (rc <= 0) {
271 break; /* cloed or error */
273 if (copy_to_user(buf, entry, oal->oal_entry_size)) {
278 buf += oal->oal_entry_size;
279 size += oal->oal_entry_size;
285 return size ? size : rc;
288 /* Included for test purposes. User buffer size must be a multiple of
289 * ofd access entry size. */
290 static ssize_t oal_file_write(struct file *filp, const char __user *buf,
291 size_t count, loff_t *ppos)
293 struct oal_circ_buf *ocb = filp->private_data;
294 struct ofd_access_log *oal = ocb->ocb_access_log;
302 if (count & (oal->oal_entry_size - 1))
305 entry = kzalloc(oal->oal_entry_size, GFP_KERNEL);
309 while (size < count) {
310 if (copy_from_user(entry, buf, oal->oal_entry_size)) {
315 rc = oal_write_entry(ocb, entry, oal->oal_entry_size);
319 buf += oal->oal_entry_size;
320 size += oal->oal_entry_size;
325 return size > 0 ? size : rc;
328 unsigned int oal_file_poll(struct file *filp, struct poll_table_struct *wait)
330 struct oal_circ_buf *ocb = filp->private_data;
331 struct ofd_access_log *oal = ocb->ocb_access_log;
332 unsigned int mask = 0;
334 poll_wait(filp, &ocb->ocb_read_wait_queue, wait);
336 spin_lock(&ocb->ocb_read_lock);
338 if (!oal_is_empty(ocb) || oal->oal_is_closed)
341 spin_unlock(&ocb->ocb_read_lock);
346 static long oal_ioctl_info(struct oal_circ_buf *ocb, unsigned long arg)
348 struct ofd_access_log *oal = ocb->ocb_access_log;
350 struct lustre_access_log_info_v1 __user *lali;
351 u32 entry_count = CIRC_CNT(ocb->ocb_circ.head,
353 oal->oal_log_size) / oal->oal_entry_size;
354 u32 entry_space = CIRC_SPACE(ocb->ocb_circ.head,
356 oal->oal_log_size) / oal->oal_entry_size;
358 lali = (struct lustre_access_log_info_v1 __user *)arg;
359 BUILD_BUG_ON(sizeof(lali->lali_name) != sizeof(oal->oal_name));
361 if (put_user(LUSTRE_ACCESS_LOG_VERSION_1, &lali->lali_version))
364 if (put_user(LUSTRE_ACCESS_LOG_TYPE_OFD, &lali->lali_type))
367 if (copy_to_user(lali->lali_name, oal->oal_name, sizeof(oal->oal_name)))
370 if (put_user(oal->oal_log_size, &lali->lali_log_size))
373 if (put_user(oal->oal_entry_size, &lali->lali_entry_size))
376 if (put_user(ocb->ocb_circ.head, &lali->_lali_head))
379 if (put_user(ocb->ocb_circ.tail, &lali->_lali_tail))
382 if (put_user(entry_space, &lali->_lali_entry_space))
385 if (put_user(entry_count, &lali->_lali_entry_count))
388 if (put_user(ocb->ocb_drop_count, &lali->_lali_drop_count))
391 if (put_user(oal->oal_is_closed, &lali->_lali_is_closed))
397 static long oal_file_ioctl(struct file *filp, unsigned int cmd,
400 struct oal_circ_buf *ocb = filp->private_data;
403 case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
404 return LUSTRE_ACCESS_LOG_VERSION_1;
405 case LUSTRE_ACCESS_LOG_IOCTL_INFO:
406 return oal_ioctl_info(ocb, arg);
407 case LUSTRE_ACCESS_LOG_IOCTL_FILTER:
408 ocb->ocb_filter = arg;
415 static int oal_file_release(struct inode *inode, struct file *filp)
417 struct oal_circ_buf *ocb = filp->private_data;
418 struct ofd_access_log *oal = ocb->ocb_access_log;
420 down_write(&oal->oal_buf_list_sem);
421 list_del(&ocb->ocb_list);
422 up_write(&oal->oal_buf_list_sem);
424 vfree(ocb->ocb_circ.buf);
430 static const struct file_operations oal_fops = {
431 .owner = THIS_MODULE,
432 .open = &oal_file_open,
433 .release = &oal_file_release,
434 .unlocked_ioctl = &oal_file_ioctl,
435 .read = &oal_file_read,
436 .write = &oal_file_write,
437 .poll = &oal_file_poll,
438 .llseek = &no_llseek,
441 static void oal_device_release(struct device *dev)
443 struct ofd_access_log *oal = dev_get_drvdata(dev);
445 oal_log_minor_free(MINOR(oal->oal_device.devt));
446 BUG_ON(!list_empty(&oal->oal_circ_buf_list));
450 struct ofd_access_log *ofd_access_log_create(const char *ofd_name, size_t size)
452 const size_t entry_size = sizeof(struct ofd_access_entry_v1);
453 struct ofd_access_log *oal;
457 BUILD_BUG_ON(sizeof(oal->oal_name) != MAX_OBD_NAME);
458 BUILD_BUG_ON(!is_power_of_2(entry_size));
463 if (!is_power_of_2(size) || (size & (entry_size - 1)) ||
464 (unsigned int)size != size)
465 return ERR_PTR(-EINVAL);
467 oal = kzalloc(sizeof(*oal), GFP_KERNEL);
469 return ERR_PTR(-ENOMEM);
471 strlcpy(oal->oal_name, ofd_name, sizeof(oal->oal_name));
472 oal->oal_log_size = size;
473 oal->oal_entry_size = entry_size;
474 INIT_LIST_HEAD(&oal->oal_circ_buf_list);
475 init_rwsem(&oal->oal_buf_list_sem);
477 rc = oal_log_minor_alloc(&minor);
481 device_initialize(&oal->oal_device);
482 oal->oal_device.devt = MKDEV(oal_log_major, minor);
483 oal->oal_device.class = oal_log_class;
484 oal->oal_device.release = &oal_device_release;
485 dev_set_drvdata(&oal->oal_device, oal);
486 rc = dev_set_name(&oal->oal_device,
487 "%s!%s", LUSTRE_ACCESS_LOG_DIR_NAME, oal->oal_name);
491 cdev_init(&oal->oal_cdev, &oal_fops);
492 oal->oal_cdev.owner = THIS_MODULE;
493 rc = cdev_device_add(&oal->oal_cdev, &oal->oal_device);
495 goto out_device_name;
497 oal_control_event_inc();
502 kfree_const(oal->oal_device.kobj.name);
504 oal_log_minor_free(minor);
511 void ofd_access(const struct lu_env *env,
512 struct ofd_device *m,
513 const struct lu_fid *parent_fid,
514 __u64 begin, __u64 end,
516 unsigned int segment_count,
519 unsigned int flags = (rw == READ) ? OFD_ACCESS_READ : OFD_ACCESS_WRITE;
520 struct ofd_access_log *oal = m->ofd_access_log;
522 if (oal && (flags & m->ofd_access_log_mask)) {
523 struct ofd_access_entry_v1 oae = {
524 .oae_parent_fid = *parent_fid,
527 .oae_time = ktime_get_real_seconds(),
529 .oae_segment_count = segment_count,
532 struct oal_circ_buf *ocb;
533 struct lu_seq_range range;
536 /* learn target MDT from FID's sequence */
537 range.lsr_flags = LU_SEQ_RANGE_ANY;
538 rc = fld_server_lookup(env, m->ofd_seq_site.ss_server_fld,
539 fid_seq(parent_fid), &range);
541 CERROR("%s: can't resolve "DFID": rc=%d\n",
542 ofd_name(m), PFID(parent_fid), rc);
544 down_read(&oal->oal_buf_list_sem);
545 list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
546 /* filter by MDT index if requested */
547 if (ocb->ocb_filter == 0xffffffff ||
548 range.lsr_index == ocb->ocb_filter)
549 oal_write_entry(ocb, &oae, sizeof(oae));
551 up_read(&oal->oal_buf_list_sem);
555 /* Called on OST umount to:
556 * - Close the write end of the oal. The wakes any tasks sleeping in
557 * read or poll and makes all reads return zero once the log
559 * - Delete the associated stuct device and cdev, preventing new
560 * opens. Existing opens retain a reference on the oal through
561 * their reference on oal_device.
562 * The oal will be freed when the last open file handle is closed. */
563 void ofd_access_log_delete(struct ofd_access_log *oal)
565 struct oal_circ_buf *ocb;
570 oal->oal_is_closed = 1;
571 down_read(&oal->oal_buf_list_sem);
572 list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
573 wake_up_all(&ocb->ocb_read_wait_queue);
575 up_read(&oal->oal_buf_list_sem);
576 cdev_device_del(&oal->oal_cdev, &oal->oal_device);
579 /* private_data for control device file. */
580 struct oal_control_file {
584 /* Control file usage:
585 * Open /dev/lustre-access-log/control.
587 * Poll for readable on control FD.
588 * Call ioctl(FD, LUSTRE_ACCESS_LOG_IOCTL_PRESCAN) to fetch event count.
589 * Scan /dev/ or /sys/class/... for new devices.
591 static int oal_control_file_open(struct inode *inode, struct file *filp)
593 struct oal_control_file *ccf;
596 rc = nonseekable_open(inode, filp);
600 /* ccf->ccf_event_count = 0 on open */
601 ccf = kzalloc(sizeof(*ccf), GFP_KERNEL);
605 filp->private_data = ccf;
610 static int oal_control_file_release(struct inode *inode, struct file *filp)
612 kfree(filp->private_data);
616 static unsigned int oal_control_file_poll(struct file *filp, poll_table *wait)
618 struct oal_control_file *ccf = filp->private_data;
619 unsigned int mask = 0;
621 poll_wait(filp, &oal_control_wait_queue, wait);
623 if (atomic_read(&oal_control_event_count) != ccf->ccf_event_count)
629 static long oal_control_file_ioctl(struct file *filp, unsigned int cmd,
632 struct oal_control_file *ccf = filp->private_data;
635 case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
636 return LUSTRE_ACCESS_LOG_VERSION_1;
637 case LUSTRE_ACCESS_LOG_IOCTL_MAJOR:
638 return oal_log_major;
639 case LUSTRE_ACCESS_LOG_IOCTL_PRESCAN:
640 ccf->ccf_event_count = atomic_read(&oal_control_event_count);
647 static const struct file_operations oal_control_fops = {
648 .owner = THIS_MODULE,
649 .open = &oal_control_file_open,
650 .release = &oal_control_file_release,
651 .poll = &oal_control_file_poll,
652 .unlocked_ioctl = &oal_control_file_ioctl,
653 .llseek = &noop_llseek,
656 static struct miscdevice oal_control_misc = {
657 .minor = MISC_DYNAMIC_MINOR,
658 .name = LUSTRE_ACCESS_LOG_DIR_NAME"!control",
659 .fops = &oal_control_fops,
662 int ofd_access_log_module_init(void)
667 BUILD_BUG_ON(!is_power_of_2(sizeof(struct ofd_access_entry_v1)));
669 rc = misc_register(&oal_control_misc);
673 rc = alloc_chrdev_region(&dev, 0, OAL_DEV_COUNT,
674 LUSTRE_ACCESS_LOG_DIR_NAME);
676 goto out_oal_control_misc;
678 oal_log_major = MAJOR(dev);
680 oal_log_class = class_create(THIS_MODULE, LUSTRE_ACCESS_LOG_DIR_NAME);
681 if (IS_ERR(oal_log_class)) {
682 rc = PTR_ERR(oal_log_class);
688 unregister_chrdev_region(dev, OAL_DEV_COUNT);
689 out_oal_control_misc:
690 misc_deregister(&oal_control_misc);
695 void ofd_access_log_module_exit(void)
697 class_destroy(oal_log_class);
698 unregister_chrdev_region(MKDEV(oal_log_major, 0), OAL_DEV_COUNT);
699 idr_destroy(&oal_log_minor_idr);
700 misc_deregister(&oal_control_misc);