char oal_name[128]; /* lustre-OST0000 */
struct device oal_device;
struct cdev oal_cdev;
- struct circ_buf oal_circ;
- wait_queue_head_t oal_read_wait_queue;
- spinlock_t oal_read_lock;
- spinlock_t oal_write_lock;
- unsigned int oal_drop_count;
+ struct rw_semaphore oal_buf_list_sem;
+ struct list_head oal_circ_buf_list;
unsigned int oal_is_closed;
unsigned int oal_log_size;
unsigned int oal_entry_size;
};
+struct oal_circ_buf {
+ struct list_head ocb_list;
+ spinlock_t ocb_write_lock;
+ spinlock_t ocb_read_lock;
+ struct ofd_access_log *ocb_access_log;
+ __u32 ocb_filter;
+ wait_queue_head_t ocb_read_wait_queue;
+ unsigned int ocb_drop_count;
+ struct circ_buf ocb_circ;
+};
+
static atomic_t oal_control_event_count = ATOMIC_INIT(0);
static DECLARE_WAIT_QUEUE_HEAD(oal_control_wait_queue);
spin_unlock(&oal_log_minor_lock);
}
-static bool oal_is_empty(struct ofd_access_log *oal)
+static bool oal_is_empty(struct oal_circ_buf *ocb)
{
- return CIRC_CNT(oal->oal_circ.head,
- oal->oal_circ.tail,
+ struct ofd_access_log *oal = ocb->ocb_access_log;
+
+ return CIRC_CNT(ocb->ocb_circ.head,
+ ocb->ocb_circ.tail,
oal->oal_log_size) < oal->oal_entry_size;
}
-static ssize_t oal_write_entry(struct ofd_access_log *oal,
+static ssize_t oal_write_entry(struct oal_circ_buf *ocb,
const void *entry, size_t entry_size)
{
- struct circ_buf *circ = &oal->oal_circ;
+ struct ofd_access_log *oal = ocb->ocb_access_log;
+ struct circ_buf *circ = &ocb->ocb_circ;
unsigned int head;
unsigned int tail;
ssize_t rc;
if (entry_size != oal->oal_entry_size)
return -EINVAL;
- spin_lock(&oal->oal_write_lock);
+ spin_lock(&ocb->ocb_write_lock);
head = circ->head;
tail = READ_ONCE(circ->tail);
* 1. It always leaves one free char, since a completely full
* buffer would have head == tail, which is the same as empty. */
if (CIRC_SPACE(head, tail, oal->oal_log_size) < oal->oal_entry_size) {
- oal->oal_drop_count++;
+ ocb->ocb_drop_count++;
rc = -EAGAIN;
goto out_write_lock;
}
smp_store_release(&circ->head,
(head + oal->oal_entry_size) & (oal->oal_log_size - 1));
- wake_up(&oal->oal_read_wait_queue);
+ wake_up(&ocb->ocb_read_wait_queue);
out_write_lock:
- spin_unlock(&oal->oal_write_lock);
+ spin_unlock(&ocb->ocb_write_lock);
return rc;
}
* When the log is empty we return -EAGAIN if the OST is still mounted
* and 0 otherwise.
*/
-static ssize_t oal_read_entry(struct ofd_access_log *oal,
+static ssize_t oal_read_entry(struct oal_circ_buf *ocb,
void *entry_buf, size_t entry_buf_size)
{
- struct circ_buf *circ = &oal->oal_circ;
+ struct ofd_access_log *oal = ocb->ocb_access_log;
+ struct circ_buf *circ = &ocb->ocb_circ;
unsigned int head;
unsigned int tail;
ssize_t rc;
/* XXX This method may silently truncate entries when
* entry_buf_size is less than oal_entry_size. But that's OK
* because you know what you are doing. */
- spin_lock(&oal->oal_read_lock);
+ spin_lock(&ocb->ocb_read_lock);
/* Memory barrier usage follows circular-buffers.txt. */
head = smp_load_acquire(&circ->head);
(tail + oal->oal_entry_size) & (oal->oal_log_size - 1));
out_read_lock:
- spin_unlock(&oal->oal_read_lock);
+ spin_unlock(&ocb->ocb_read_lock);
return rc;
}
static int oal_file_open(struct inode *inode, struct file *filp)
{
- filp->private_data = container_of(inode->i_cdev,
- struct ofd_access_log, oal_cdev);
+ struct ofd_access_log *oal;
+ struct oal_circ_buf *ocb;
+
+ oal = container_of(inode->i_cdev, struct ofd_access_log, oal_cdev);
+
+ ocb = kzalloc(sizeof(*ocb), GFP_KERNEL);
+ if (!ocb)
+ return -ENOMEM;
+ ocb->ocb_circ.buf = vmalloc(oal->oal_log_size);
+ if (!ocb->ocb_circ.buf) {
+ kfree(ocb);
+ return -ENOMEM;
+ }
+
+ spin_lock_init(&ocb->ocb_write_lock);
+ spin_lock_init(&ocb->ocb_read_lock);
+ ocb->ocb_access_log = oal;
+ init_waitqueue_head(&ocb->ocb_read_wait_queue);
+
+ down_write(&oal->oal_buf_list_sem);
+ list_add(&ocb->ocb_list, &oal->oal_circ_buf_list);
+ up_write(&oal->oal_buf_list_sem);
+
+ filp->private_data = ocb;
return nonseekable_open(inode, filp);
}
static ssize_t oal_file_read(struct file *filp, char __user *buf, size_t count,
loff_t *ppos)
{
- struct ofd_access_log *oal = filp->private_data;
+ struct oal_circ_buf *ocb = filp->private_data;
+ struct ofd_access_log *oal = ocb->ocb_access_log;
void *entry;
size_t size = 0;
int rc = 0;
return -ENOMEM;
while (size < count) {
- rc = oal_read_entry(oal, entry, oal->oal_entry_size);
+ rc = oal_read_entry(ocb, entry, oal->oal_entry_size);
if (rc == -EAGAIN) {
if (filp->f_flags & O_NONBLOCK)
break;
- rc = wait_event_interruptible(oal->oal_read_wait_queue,
- !oal_is_empty(oal) || oal->oal_is_closed);
+ rc = wait_event_interruptible(ocb->ocb_read_wait_queue,
+ !oal_is_empty(ocb) || oal->oal_is_closed);
if (rc)
break;
} else if (rc <= 0) {
static ssize_t oal_file_write(struct file *filp, const char __user *buf,
size_t count, loff_t *ppos)
{
- struct ofd_access_log *oal = filp->private_data;
+ struct oal_circ_buf *ocb = filp->private_data;
+ struct ofd_access_log *oal = ocb->ocb_access_log;
void *entry;
size_t size = 0;
ssize_t rc = 0;
break;
}
- rc = oal_write_entry(oal, entry, oal->oal_entry_size);
+ rc = oal_write_entry(ocb, entry, oal->oal_entry_size);
if (rc <= 0)
break;
unsigned int oal_file_poll(struct file *filp, struct poll_table_struct *wait)
{
- struct ofd_access_log *oal = filp->private_data;
+ struct oal_circ_buf *ocb = filp->private_data;
+ struct ofd_access_log *oal = ocb->ocb_access_log;
unsigned int mask = 0;
- poll_wait(filp, &oal->oal_read_wait_queue, wait);
+ poll_wait(filp, &ocb->ocb_read_wait_queue, wait);
- spin_lock(&oal->oal_read_lock);
+ spin_lock(&ocb->ocb_read_lock);
- if (!oal_is_empty(oal) || oal->oal_is_closed)
+ if (!oal_is_empty(ocb) || oal->oal_is_closed)
mask |= POLLIN;
- spin_unlock(&oal->oal_read_lock);
+ spin_unlock(&ocb->ocb_read_lock);
return mask;
}
-static long oal_ioctl_info(struct ofd_access_log *oal, unsigned long arg)
+static long oal_ioctl_info(struct oal_circ_buf *ocb, unsigned long arg)
{
+ struct ofd_access_log *oal = ocb->ocb_access_log;
+
struct lustre_access_log_info_v1 __user *lali;
- u32 entry_count = CIRC_CNT(oal->oal_circ.head,
- oal->oal_circ.tail,
+ u32 entry_count = CIRC_CNT(ocb->ocb_circ.head,
+ ocb->ocb_circ.tail,
oal->oal_log_size) / oal->oal_entry_size;
- u32 entry_space = CIRC_SPACE(oal->oal_circ.head,
- oal->oal_circ.tail,
+ u32 entry_space = CIRC_SPACE(ocb->ocb_circ.head,
+ ocb->ocb_circ.tail,
oal->oal_log_size) / oal->oal_entry_size;
lali = (struct lustre_access_log_info_v1 __user *)arg;
if (put_user(oal->oal_entry_size, &lali->lali_entry_size))
return -EFAULT;
- if (put_user(oal->oal_circ.head, &lali->_lali_head))
+ if (put_user(ocb->ocb_circ.head, &lali->_lali_head))
return -EFAULT;
- if (put_user(oal->oal_circ.tail, &lali->_lali_tail))
+ if (put_user(ocb->ocb_circ.tail, &lali->_lali_tail))
return -EFAULT;
if (put_user(entry_space, &lali->_lali_entry_space))
if (put_user(entry_count, &lali->_lali_entry_count))
return -EFAULT;
- if (put_user(oal->oal_drop_count, &lali->_lali_drop_count))
+ if (put_user(ocb->ocb_drop_count, &lali->_lali_drop_count))
return -EFAULT;
if (put_user(oal->oal_is_closed, &lali->_lali_is_closed))
static long oal_file_ioctl(struct file *filp, unsigned int cmd,
unsigned long arg)
{
- struct ofd_access_log *oal = filp->private_data;
+ struct oal_circ_buf *ocb = filp->private_data;
switch (cmd) {
case LUSTRE_ACCESS_LOG_IOCTL_VERSION:
return LUSTRE_ACCESS_LOG_VERSION_1;
case LUSTRE_ACCESS_LOG_IOCTL_INFO:
- return oal_ioctl_info(oal, arg);
+ return oal_ioctl_info(ocb, arg);
+ case LUSTRE_ACCESS_LOG_IOCTL_FILTER:
+ ocb->ocb_filter = arg;
+ return 0;
default:
return -ENOTTY;
}
}
+static int oal_file_release(struct inode *inode, struct file *filp)
+{
+ struct oal_circ_buf *ocb = filp->private_data;
+ struct ofd_access_log *oal = ocb->ocb_access_log;
+
+ down_write(&oal->oal_buf_list_sem);
+ list_del(&ocb->ocb_list);
+ up_write(&oal->oal_buf_list_sem);
+
+ vfree(ocb->ocb_circ.buf);
+ kfree(ocb);
+
+ return 0;
+}
+
static const struct file_operations oal_fops = {
.owner = THIS_MODULE,
.open = &oal_file_open,
+ .release = &oal_file_release,
.unlocked_ioctl = &oal_file_ioctl,
.read = &oal_file_read,
.write = &oal_file_write,
struct ofd_access_log *oal = dev_get_drvdata(dev);
oal_log_minor_free(MINOR(oal->oal_device.devt));
- vfree(oal->oal_circ.buf);
+ BUG_ON(!list_empty(&oal->oal_circ_buf_list));
kfree(oal);
}
strlcpy(oal->oal_name, ofd_name, sizeof(oal->oal_name));
oal->oal_log_size = size;
oal->oal_entry_size = entry_size;
- spin_lock_init(&oal->oal_write_lock);
- spin_lock_init(&oal->oal_read_lock);
- init_waitqueue_head(&oal->oal_read_wait_queue);
-
- oal->oal_circ.buf = vmalloc(oal->oal_log_size);
- if (!oal->oal_circ.buf) {
- rc = -ENOMEM;
- goto out_free;
- }
+ INIT_LIST_HEAD(&oal->oal_circ_buf_list);
+ init_rwsem(&oal->oal_buf_list_sem);
rc = oal_log_minor_alloc(&minor);
if (rc < 0)
out_minor:
oal_log_minor_free(minor);
out_free:
- vfree(oal->oal_circ.buf);
kfree(oal);
return ERR_PTR(rc);
}
-void ofd_access(struct ofd_device *m,
+void ofd_access(const struct lu_env *env,
+ struct ofd_device *m,
const struct lu_fid *parent_fid,
__u64 begin, __u64 end,
unsigned int size,
int rw)
{
unsigned int flags = (rw == READ) ? OFD_ACCESS_READ : OFD_ACCESS_WRITE;
+ struct ofd_access_log *oal = m->ofd_access_log;
- if (m->ofd_access_log && (flags & m->ofd_access_log_mask)) {
+ if (oal && (flags & m->ofd_access_log_mask)) {
struct ofd_access_entry_v1 oae = {
.oae_parent_fid = *parent_fid,
.oae_begin = begin,
.oae_segment_count = segment_count,
.oae_flags = flags,
};
-
- oal_write_entry(m->ofd_access_log, &oae, sizeof(oae));
+ struct oal_circ_buf *ocb;
+ struct lu_seq_range range;
+ int rc;
+
+ /* learn target MDT from FID's sequence */
+ range.lsr_flags = LU_SEQ_RANGE_ANY;
+ rc = fld_server_lookup(env, m->ofd_seq_site.ss_server_fld,
+ fid_seq(parent_fid), &range);
+ if (unlikely(rc))
+ CERROR("%s: can't resolve "DFID": rc=%d\n",
+ ofd_name(m), PFID(parent_fid), rc);
+
+ down_read(&oal->oal_buf_list_sem);
+ list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
+ /* filter by MDT index if requested */
+ if (ocb->ocb_filter == 0xffffffff ||
+ range.lsr_index == ocb->ocb_filter)
+ oal_write_entry(ocb, &oae, sizeof(oae));
+ }
+ up_read(&oal->oal_buf_list_sem);
}
}
* The oal will be freed when the last open file handle is closed. */
void ofd_access_log_delete(struct ofd_access_log *oal)
{
+ struct oal_circ_buf *ocb;
+
if (!oal)
return;
oal->oal_is_closed = 1;
- wake_up_all(&oal->oal_read_wait_queue);
+ down_read(&oal->oal_buf_list_sem);
+ list_for_each_entry(ocb, &oal->oal_circ_buf_list, ocb_list) {
+ wake_up_all(&ocb->ocb_read_wait_queue);
+ }
+ up_read(&oal->oal_buf_list_sem);
cdev_device_del(&oal->oal_cdev, &oal->oal_device);
}