X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_changelog.c;h=fb9fa54adc38ccc3f9fe5980ee750bbd93ee9cca;hp=3ae27f9cd5329453d4230d659becb46a2fad83d2;hb=5c883ea2748ae9e430a9cd863a9b630b2a74440a;hpb=1d40214d96dd6e36bd39a35f8419f753bae8d305 diff --git a/lustre/mdc/mdc_changelog.c b/lustre/mdc/mdc_changelog.c index 3ae27f9..fb9fa54 100644 --- a/lustre/mdc/mdc_changelog.c +++ b/lustre/mdc/mdc_changelog.c @@ -23,6 +23,8 @@ * Copyright (c) 2017, Commissariat a l'Energie Atomique et aux Energies * Alternatives. * + * Copyright (c) 2017, Intel Corporation. + * * Author: Henri Doreau */ @@ -34,6 +36,7 @@ #include #include +#include #include "mdc_internal.h" @@ -68,25 +71,30 @@ struct chlg_registered_dev { struct chlg_reader_state { /* Shortcut to the corresponding OBD device */ - struct obd_device *crs_obd; + struct obd_device *crs_obd; + /* the corresponding chlg_registered_dev */ + struct chlg_registered_dev *crs_ced; + /* Producer thread (if any) */ + struct task_struct *crs_prod_task; /* An error occurred that prevents from reading further */ - bool crs_err; + int crs_err; /* EOF, no more records available */ - bool crs_eof; - /* Userland reader closed connection */ - bool crs_closed; + bool crs_eof; /* Desired start position */ - __u64 crs_start_offset; + __u64 crs_start_offset; /* Wait queue for the catalog processing thread */ - wait_queue_head_t crs_waitq_prod; + wait_queue_head_t crs_waitq_prod; /* Wait queue for the record copy threads */ - wait_queue_head_t crs_waitq_cons; + wait_queue_head_t crs_waitq_cons; /* Mutex protecting crs_rec_count and crs_rec_queue */ - struct mutex crs_lock; + struct mutex crs_lock; /* Number of item in the list */ - __u64 crs_rec_count; + __u64 crs_rec_count; /* List of prefetched enqueued_record::enq_linkage_items */ - struct list_head crs_rec_queue; + struct list_head crs_rec_queue; + unsigned int crs_last_catidx; + unsigned int crs_last_idx; + bool crs_poll; }; struct chlg_rec_entry { @@ -104,6 +112,43 @@ enum { }; /** + * Deregister a changelog character device whose refcount has reached zero. + */ +static void chlg_dev_clear(struct kref *kref) +{ + struct chlg_registered_dev *entry = container_of(kref, + struct chlg_registered_dev, + ced_refs); + ENTRY; + + list_del(&entry->ced_link); + misc_deregister(&entry->ced_misc); + OBD_FREE_PTR(entry); + EXIT; +} + +static inline struct obd_device* chlg_obd_get(struct chlg_registered_dev *dev) +{ + struct obd_device *obd; + + mutex_lock(&chlg_registered_dev_lock); + if (list_empty(&dev->ced_obds)) + return NULL; + + obd = list_first_entry(&dev->ced_obds, struct obd_device, + u.cli.cl_chg_dev_linkage); + class_incref(obd, "changelog", dev); + mutex_unlock(&chlg_registered_dev_lock); + return obd; +} + +static inline void chlg_obd_put(struct chlg_registered_dev *dev, + struct obd_device *obd) +{ + class_decref(obd, "changelog", dev); +} + +/** * ChangeLog catalog processing callback invoked on each record. * If the current record is eligible to userland delivery, push * it into the crs_rec_queue where the consumer code will fetch it. @@ -122,7 +167,6 @@ static int chlg_read_cat_process_cb(const struct lu_env *env, struct llog_changelog_rec *rec; struct chlg_reader_state *crs = data; struct chlg_rec_entry *enq; - struct l_wait_info lwi = { 0 }; size_t len; int rc; ENTRY; @@ -132,6 +176,9 @@ static int chlg_read_cat_process_cb(const struct lu_env *env, rec = container_of(hdr, struct llog_changelog_rec, cr_hdr); + crs->crs_last_catidx = llh->lgh_hdr->llh_cat_idx; + crs->crs_last_idx = hdr->lrh_index; + if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { rc = -EINVAL; CERROR("%s: not a changelog rec %x/%d in llog "DFID" rc = %d\n", @@ -152,11 +199,11 @@ static int chlg_read_cat_process_cb(const struct lu_env *env, PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rec->cr.cr_namelen, changelog_rec_name(&rec->cr)); - l_wait_event(crs->crs_waitq_prod, - (crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH || - crs->crs_closed), &lwi); + wait_event_interruptible(crs->crs_waitq_prod, + crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH || + kthread_should_stop()); - if (crs->crs_closed) + if (kthread_should_stop()) RETURN(LLOG_PROC_BREAK); len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen; @@ -188,22 +235,6 @@ static void enq_record_delete(struct chlg_rec_entry *rec) } /** - * Release resources associated to a changelog_reader_state instance. - * - * @param crs CRS instance to release. - */ -static void crs_free(struct chlg_reader_state *crs) -{ - struct chlg_rec_entry *rec; - struct chlg_rec_entry *tmp; - - list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) - enq_record_delete(rec); - - OBD_FREE_PTR(crs); -} - -/** * Record prefetch thread entry point. Opens the changelog catalog and starts * reading records. * @@ -213,13 +244,23 @@ static void crs_free(struct chlg_reader_state *crs) static int chlg_load(void *args) { struct chlg_reader_state *crs = args; - struct obd_device *obd = crs->crs_obd; + struct chlg_registered_dev *ced = crs->crs_ced; + struct obd_device *obd = NULL; struct llog_ctxt *ctx = NULL; struct llog_handle *llh = NULL; - struct l_wait_info lwi = { 0 }; int rc; ENTRY; + crs->crs_last_catidx = -1; + crs->crs_last_idx = 0; + +again: + obd = chlg_obd_get(ced); + if (obd == NULL) + RETURN(-ENODEV); + + crs->crs_obd = obd; + ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); if (ctx == NULL) GOTO(err_out, rc = -ENOENT); @@ -232,21 +273,42 @@ static int chlg_load(void *args) GOTO(err_out, rc); } - rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT|LLOG_F_EXT_JOBID, NULL); + + rc = llog_init_handle(NULL, llh, + LLOG_F_IS_CAT | + LLOG_F_EXT_JOBID | + LLOG_F_EXT_EXTRA_FLAGS | + LLOG_F_EXT_X_UIDGID | + LLOG_F_EXT_X_NID | + LLOG_F_EXT_X_OMODE | + LLOG_F_EXT_X_XATTR, + NULL); if (rc) { CERROR("%s: fail to init llog handle: rc = %d\n", obd->obd_name, rc); GOTO(err_out, rc); } - rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs, 0, 0); + rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs, + crs->crs_last_catidx, crs->crs_last_idx); if (rc < 0) { CERROR("%s: fail to process llog: rc = %d\n", obd->obd_name, rc); GOTO(err_out, rc); } + if (!kthread_should_stop() && crs->crs_poll) { + llog_cat_close(NULL, llh); + llog_ctxt_put(ctx); + class_decref(obd, "changelog", crs); + schedule_timeout_interruptible(cfs_time_seconds(1)); + goto again; + } + + crs->crs_eof = true; err_out: - crs->crs_err = true; + if (rc < 0) + crs->crs_err = rc; + wake_up_all(&crs->crs_waitq_cons); if (llh != NULL) @@ -255,8 +317,10 @@ err_out: if (ctx != NULL) llog_ctxt_put(ctx); - l_wait_event(crs->crs_waitq_prod, crs->crs_closed, &lwi); - crs_free(crs); + crs->crs_obd = NULL; + chlg_obd_put(ced, obd); + wait_event_interruptible(crs->crs_waitq_prod, kthread_should_stop()); + RETURN(rc); } @@ -278,17 +342,22 @@ static ssize_t chlg_read(struct file *file, char __user *buff, size_t count, struct chlg_reader_state *crs = file->private_data; struct chlg_rec_entry *rec; struct chlg_rec_entry *tmp; - struct l_wait_info lwi = { 0 }; - ssize_t written_total = 0; + size_t written_total = 0; + ssize_t rc; LIST_HEAD(consumed); ENTRY; - if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0) - RETURN(-EAGAIN); + if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0) { + if (crs->crs_err < 0) + RETURN(crs->crs_err); + else if (crs->crs_eof) + RETURN(0); + else + RETURN(-EAGAIN); + } - l_wait_event(crs->crs_waitq_cons, - crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err, - &lwi); + rc = wait_event_interruptible(crs->crs_waitq_cons, + crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err); mutex_lock(&crs->crs_lock); list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) { @@ -296,8 +365,7 @@ static ssize_t chlg_read(struct file *file, char __user *buff, size_t count, break; if (copy_to_user(buff, rec->enq_record, rec->enq_length)) { - if (written_total == 0) - written_total = -EFAULT; + rc = -EFAULT; break; } @@ -311,15 +379,19 @@ static ssize_t chlg_read(struct file *file, char __user *buff, size_t count, } mutex_unlock(&crs->crs_lock); - if (written_total > 0) + if (written_total > 0) { + rc = written_total; wake_up_all(&crs->crs_waitq_prod); + } else if (rc == 0) { + rc = crs->crs_err; + } list_for_each_entry_safe(rec, tmp, &consumed, enq_linkage) enq_record_delete(rec); *ppos = crs->crs_start_offset; - RETURN(written_total); + RETURN(rc); } /** @@ -404,15 +476,23 @@ static loff_t chlg_llseek(struct file *file, loff_t off, int whence) */ static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record) { - struct obd_device *obd = crs->crs_obd; + struct obd_device *obd = NULL; struct changelog_setinfo cs = { .cs_recno = record, .cs_id = reader }; + int rc; + + obd = chlg_obd_get(crs->crs_ced); + if (obd == NULL) + return -ENODEV; + + rc = obd_set_info_async(NULL, obd->obd_self_export, + strlen(KEY_CHANGELOG_CLEAR), + KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL); - return obd_set_info_async(NULL, obd->obd_self_export, - strlen(KEY_CHANGELOG_CLEAR), - KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL); + chlg_obd_put(crs->crs_ced, obd); + return rc; } /** Maximum changelog control command size */ @@ -462,31 +542,6 @@ out_kbuf: } /** - * Find the OBD device associated to a changelog character device. - * @param[in] cdev character device instance descriptor - * @return corresponding OBD device or NULL if none was found. - */ -static struct obd_device *chlg_obd_get(dev_t cdev) -{ - int minor = MINOR(cdev); - struct obd_device *obd = NULL; - struct chlg_registered_dev *curr; - - mutex_lock(&chlg_registered_dev_lock); - list_for_each_entry(curr, &chlg_registered_devices, ced_link) { - if (curr->ced_misc.minor == minor) { - /* take the first available OBD device attached */ - obd = list_first_entry(&curr->ced_obds, - struct obd_device, - u.cli.cl_chg_dev_linkage); - break; - } - } - mutex_unlock(&chlg_registered_dev_lock); - return obd; -} - -/** * Open handler, initialize internal CRS state and spawn prefetch thread if * needed. * @param[in] inode Inode struct for the open character device. @@ -496,22 +551,22 @@ static struct obd_device *chlg_obd_get(dev_t cdev) static int chlg_open(struct inode *inode, struct file *file) { struct chlg_reader_state *crs; - struct obd_device *obd = chlg_obd_get(inode->i_rdev); + struct miscdevice *misc = file->private_data; + struct chlg_registered_dev *dev; struct task_struct *task; int rc; ENTRY; - if (!obd) - RETURN(-ENODEV); + dev = container_of(misc, struct chlg_registered_dev, ced_misc); OBD_ALLOC_PTR(crs); if (!crs) RETURN(-ENOMEM); - crs->crs_obd = obd; + kref_get(&dev->ced_refs); + crs->crs_ced = dev; crs->crs_err = false; crs->crs_eof = false; - crs->crs_closed = false; mutex_init(&crs->crs_lock); INIT_LIST_HEAD(&crs->crs_rec_queue); @@ -523,15 +578,17 @@ static int chlg_open(struct inode *inode, struct file *file) if (IS_ERR(task)) { rc = PTR_ERR(task); CERROR("%s: cannot start changelog thread: rc = %d\n", - obd->obd_name, rc); + dev->ced_name, rc); GOTO(err_crs, rc); } + crs->crs_prod_task = task; } file->private_data = crs; RETURN(0); err_crs: + kref_put(&dev->ced_refs, chlg_dev_clear); OBD_FREE_PTR(crs); return rc; } @@ -546,15 +603,20 @@ err_crs: static int chlg_release(struct inode *inode, struct file *file) { struct chlg_reader_state *crs = file->private_data; + struct chlg_rec_entry *rec; + struct chlg_rec_entry *tmp; + int rc = 0; - if (file->f_mode & FMODE_READ) { - crs->crs_closed = true; - wake_up_all(&crs->crs_waitq_prod); - } else { - /* No producer thread, release resource ourselves */ - crs_free(crs); - } - return 0; + if (crs->crs_prod_task) + rc = kthread_stop(crs->crs_prod_task); + + list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) + enq_record_delete(rec); + + kref_put(&crs->crs_ced->ced_refs, chlg_dev_clear); + OBD_FREE_PTR(crs); + + return rc; } /** @@ -567,7 +629,7 @@ static int chlg_release(struct inode *inode, struct file *file) */ static unsigned int chlg_poll(struct file *file, poll_table *wait) { - struct chlg_reader_state *crs = file->private_data; + struct chlg_reader_state *crs = file->private_data; unsigned int mask = 0; mutex_lock(&crs->crs_lock); @@ -582,6 +644,23 @@ static unsigned int chlg_poll(struct file *file, poll_table *wait) return mask; } +static long chlg_ioctl(struct file *file, unsigned int cmd, unsigned long arg) +{ + int rc; + + struct chlg_reader_state *crs = file->private_data; + switch (cmd) { + case OBD_IOC_CHLG_POLL: + crs->crs_poll = !!arg; + rc = 0; + break; + default: + rc = -EINVAL; + break; + } + return rc; +} + static const struct file_operations chlg_fops = { .owner = THIS_MODULE, .llseek = chlg_llseek, @@ -590,6 +669,7 @@ static const struct file_operations chlg_fops = { .open = chlg_open, .release = chlg_release, .poll = chlg_poll, + .unlocked_ioctl = chlg_ioctl, }; /** @@ -622,6 +702,7 @@ chlg_registered_dev_find_by_name(const char *name) { struct chlg_registered_dev *dit; + LASSERT(mutex_is_locked(&chlg_registered_dev_lock)); list_for_each_entry(dit, &chlg_registered_devices, ced_link) if (strcmp(name, dit->ced_name) == 0) return dit; @@ -640,6 +721,7 @@ chlg_registered_dev_find_by_obd(const struct obd_device *obd) struct chlg_registered_dev *dit; struct obd_device *oit; + LASSERT(mutex_is_locked(&chlg_registered_dev_lock)); list_for_each_entry(dit, &chlg_registered_devices, ced_link) list_for_each_entry(oit, &dit->ced_obds, u.cli.cl_chg_dev_linkage) @@ -685,13 +767,16 @@ int mdc_changelog_cdev_init(struct obd_device *obd) GOTO(out_unlock, rc = 0); } + list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds); + list_add_tail(&entry->ced_link, &chlg_registered_devices); + /* Register new character device */ rc = misc_register(&entry->ced_misc); - if (rc != 0) + if (rc != 0) { + list_del_init(&obd->u.cli.cl_chg_dev_linkage); + list_del(&entry->ced_link); GOTO(out_unlock, rc); - - list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds); - list_add_tail(&entry->ced_link, &chlg_registered_devices); + } entry = NULL; /* prevent it from being freed below */ @@ -703,30 +788,15 @@ out_unlock: } /** - * Deregister a changelog character device whose refcount has reached zero. - */ -static void chlg_dev_clear(struct kref *kref) -{ - struct chlg_registered_dev *entry = container_of(kref, - struct chlg_registered_dev, - ced_refs); - ENTRY; - - list_del(&entry->ced_link); - misc_deregister(&entry->ced_misc); - OBD_FREE_PTR(entry); - EXIT; -} - -/** * Release OBD, decrease reference count of the corresponding changelog device. */ void mdc_changelog_cdev_finish(struct obd_device *obd) { - struct chlg_registered_dev *dev = chlg_registered_dev_find_by_obd(obd); - ENTRY; + struct chlg_registered_dev *dev; + ENTRY; mutex_lock(&chlg_registered_dev_lock); + dev = chlg_registered_dev_find_by_obd(obd); list_del_init(&obd->u.cli.cl_chg_dev_linkage); kref_put(&dev->ced_refs, chlg_dev_clear); mutex_unlock(&chlg_registered_dev_lock);