X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_changelog.c;h=c3a65f046ce89138a369b2509fed0a73cc41683d;hb=f6e4272fb0be5b798b7685bb40067e3f6877c8a5;hp=fe1ef6e987644a07261a8f54ccff4e6257fdb403;hpb=206b21741b07a10269bbcfdac28743591b64ab2f;p=fs%2Flustre-release.git diff --git a/lustre/mdc/mdc_changelog.c b/lustre/mdc/mdc_changelog.c index fe1ef6e..c3a65f0 100644 --- a/lustre/mdc/mdc_changelog.c +++ b/lustre/mdc/mdc_changelog.c @@ -33,7 +33,9 @@ #include #include #include -#include +#include +#include +#include #include #include @@ -58,41 +60,44 @@ static LIST_HEAD(chlg_registered_devices); struct chlg_registered_dev { /* Device name of the form "changelog-{MDTNAME}" */ - char ced_name[32]; - /* Misc device descriptor */ - struct miscdevice ced_misc; + char ced_name[32]; + /* changelog char device */ + struct cdev ced_cdev; + struct device ced_device; /* OBDs referencing this device (multiple mount point) */ - struct list_head ced_obds; + struct list_head ced_obds; /* Reference counter for proper deregistration */ - struct kref ced_refs; + struct kref ced_refs; /* Link within the global chlg_registered_devices */ - struct list_head ced_link; + struct list_head ced_link; }; struct chlg_reader_state { /* Shortcut to the corresponding OBD device */ - struct obd_device *crs_obd; + struct obd_device *crs_obd; + /* the corresponding chlg_registered_dev */ + struct chlg_registered_dev *crs_ced; /* Producer thread (if any) */ - struct task_struct *crs_prod_task; + struct task_struct *crs_prod_task; /* An error occurred that prevents from reading further */ - int crs_err; + int crs_err; /* EOF, no more records available */ - bool crs_eof; + bool crs_eof; /* Desired start position */ - __u64 crs_start_offset; + __u64 crs_start_offset; /* Wait queue for the catalog processing thread */ - wait_queue_head_t crs_waitq_prod; + wait_queue_head_t crs_waitq_prod; /* Wait queue for the record copy threads */ - wait_queue_head_t crs_waitq_cons; + wait_queue_head_t crs_waitq_cons; /* Mutex protecting crs_rec_count and crs_rec_queue */ - struct mutex crs_lock; + struct mutex crs_lock; /* Number of item in the list */ - __u64 crs_rec_count; + __u64 crs_rec_count; /* List of prefetched enqueued_record::enq_linkage_items */ - struct list_head crs_rec_queue; - unsigned int crs_last_catidx; - unsigned int crs_last_idx; - bool crs_poll; + struct list_head crs_rec_queue; + unsigned int crs_last_catidx; + unsigned int crs_last_idx; + bool crs_poll; }; struct chlg_rec_entry { @@ -109,6 +114,81 @@ enum { CDEV_CHLG_MAX_PREFETCH = 1024, }; +DEFINE_IDR(mdc_changelog_minor_idr); +static DEFINE_SPINLOCK(chlg_minor_lock); + +static int chlg_minor_alloc(int *pminor) +{ + void *minor_allocated = (void *)-1; + int minor; + + idr_preload(GFP_KERNEL); + spin_lock(&chlg_minor_lock); + minor = idr_alloc(&mdc_changelog_minor_idr, minor_allocated, 0, + MDC_CHANGELOG_DEV_COUNT, GFP_NOWAIT); + spin_unlock(&chlg_minor_lock); + idr_preload_end(); + + if (minor < 0) + return minor; + + *pminor = minor; + return 0; +} + +static void chlg_minor_free(int minor) +{ + spin_lock(&chlg_minor_lock); + idr_remove(&mdc_changelog_minor_idr, minor); + spin_unlock(&chlg_minor_lock); +} + +static void chlg_device_release(struct device *dev) +{ + struct chlg_registered_dev *entry = dev_get_drvdata(dev); + + chlg_minor_free(MINOR(entry->ced_cdev.dev)); + OBD_FREE_PTR(entry); +} + +/** + * Deregister a changelog character device whose refcount has reached zero. + */ +static void chlg_dev_clear(struct kref *kref) +{ + struct chlg_registered_dev *entry; + + ENTRY; + entry = container_of(kref, struct chlg_registered_dev, + ced_refs); + + list_del(&entry->ced_link); + cdev_device_del(&entry->ced_cdev, &entry->ced_device); + put_device(&entry->ced_device); + EXIT; +} + +static inline struct obd_device* chlg_obd_get(struct chlg_registered_dev *dev) +{ + struct obd_device *obd; + + mutex_lock(&chlg_registered_dev_lock); + if (list_empty(&dev->ced_obds)) + return NULL; + + obd = list_first_entry(&dev->ced_obds, struct obd_device, + u.cli.cl_chg_dev_linkage); + class_incref(obd, "changelog", dev); + mutex_unlock(&chlg_registered_dev_lock); + return obd; +} + +static inline void chlg_obd_put(struct chlg_registered_dev *dev, + struct obd_device *obd) +{ + class_decref(obd, "changelog", dev); +} + /** * ChangeLog catalog processing callback invoked on each record. * If the current record is eligible to userland delivery, push @@ -142,13 +222,17 @@ static int chlg_read_cat_process_cb(const struct lu_env *env, if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { rc = -EINVAL; - CERROR("%s: not a changelog rec %x/%d in llog "DFID" rc = %d\n", + CERROR("%s: not a changelog rec %x/%d in llog : rc = %d\n", crs->crs_obd->obd_name, rec->cr_hdr.lrh_type, - rec->cr.cr_type, - PFID(lu_object_fid(&llh->lgh_obj->do_lu)), rc); + rec->cr.cr_type, rc); RETURN(rc); } + /* Check if we can skip the entire llog plain */ + if (llog_is_plain_skipable(llh->lgh_hdr, hdr, rec->cr.cr_index, + crs->crs_start_offset)) + RETURN(LLOG_SKIP_PLAIN); + /* Skip undesired records */ if (rec->cr.cr_index < crs->crs_start_offset) RETURN(0); @@ -181,7 +265,7 @@ static int chlg_read_cat_process_cb(const struct lu_env *env, crs->crs_rec_count++; mutex_unlock(&crs->crs_lock); - wake_up_all(&crs->crs_waitq_cons); + wake_up(&crs->crs_waitq_cons); RETURN(0); } @@ -205,20 +289,27 @@ static void enq_record_delete(struct chlg_rec_entry *rec) static int chlg_load(void *args) { struct chlg_reader_state *crs = args; - struct obd_device *obd = crs->crs_obd; + struct chlg_registered_dev *ced = crs->crs_ced; + struct obd_device *obd = NULL; struct llog_ctxt *ctx = NULL; struct llog_handle *llh = NULL; int rc; ENTRY; + crs->crs_last_catidx = 0; + crs->crs_last_idx = 0; + +again: + obd = chlg_obd_get(ced); + if (obd == NULL) + RETURN(-ENODEV); + + crs->crs_obd = obd; + ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); if (ctx == NULL) GOTO(err_out, rc = -ENOENT); - crs->crs_last_catidx = -1; - crs->crs_last_idx = 0; - -again: rc = llog_open(NULL, ctx, &llh, NULL, CHANGELOG_CATALOG, LLOG_OPEN_EXISTS); if (rc) { @@ -251,7 +342,9 @@ again: } if (!kthread_should_stop() && crs->crs_poll) { llog_cat_close(NULL, llh); - schedule_timeout_interruptible(HZ); + llog_ctxt_put(ctx); + class_decref(obd, "changelog", crs); + schedule_timeout_interruptible(cfs_time_seconds(1)); goto again; } @@ -261,7 +354,7 @@ err_out: if (rc < 0) crs->crs_err = rc; - wake_up_all(&crs->crs_waitq_cons); + wake_up(&crs->crs_waitq_cons); if (llh != NULL) llog_cat_close(NULL, llh); @@ -269,11 +362,40 @@ err_out: if (ctx != NULL) llog_ctxt_put(ctx); + crs->crs_obd = NULL; + chlg_obd_put(ced, obd); wait_event_interruptible(crs->crs_waitq_prod, kthread_should_stop()); RETURN(rc); } +static int chlg_start_thread(struct file *file) +{ + struct chlg_reader_state *crs = file->private_data; + struct task_struct *task; + int rc = 0; + + if (likely(crs->crs_prod_task)) + return 0; + if (unlikely(file->f_mode & FMODE_READ) == 0) + return 0; + + mutex_lock(&crs->crs_lock); + if (crs->crs_prod_task == NULL) { + task = kthread_run(chlg_load, crs, "chlg_load_thread"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CERROR("%s: cannot start changelog thread: rc = %d\n", + crs->crs_ced->ced_name, rc); + GOTO(out, rc); + } + crs->crs_prod_task = task; + } +out: + mutex_unlock(&crs->crs_lock); + return rc; +} + /** * Read handler, dequeues records from the chlg_reader_state if any. * No partial records are copied to userland so this function can return less @@ -306,6 +428,10 @@ static ssize_t chlg_read(struct file *file, char __user *buff, size_t count, RETURN(-EAGAIN); } + rc = chlg_start_thread(file); + if (rc) + RETURN(rc); + rc = wait_event_interruptible(crs->crs_waitq_cons, crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err); @@ -331,7 +457,7 @@ static ssize_t chlg_read(struct file *file, char __user *buff, size_t count, if (written_total > 0) { rc = written_total; - wake_up_all(&crs->crs_waitq_prod); + wake_up(&crs->crs_waitq_prod); } else if (rc == 0) { rc = crs->crs_err; } @@ -374,7 +500,7 @@ static int chlg_set_start_offset(struct chlg_reader_state *crs, __u64 offset) } mutex_unlock(&crs->crs_lock); - wake_up_all(&crs->crs_waitq_prod); + wake_up(&crs->crs_waitq_prod); return 0; } @@ -426,15 +552,23 @@ static loff_t chlg_llseek(struct file *file, loff_t off, int whence) */ static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record) { - struct obd_device *obd = crs->crs_obd; + struct obd_device *obd = NULL; struct changelog_setinfo cs = { .cs_recno = record, .cs_id = reader }; + int rc; + + obd = chlg_obd_get(crs->crs_ced); + if (obd == NULL) + return -ENODEV; + + rc = obd_set_info_async(NULL, obd->obd_self_export, + strlen(KEY_CHANGELOG_CLEAR), + KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL); - return obd_set_info_async(NULL, obd->obd_self_export, - strlen(KEY_CHANGELOG_CLEAR), - KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL); + chlg_obd_put(crs->crs_ced, obd); + return rc; } /** Maximum changelog control command size */ @@ -493,23 +627,17 @@ out_kbuf: static int chlg_open(struct inode *inode, struct file *file) { struct chlg_reader_state *crs; - struct miscdevice *misc = file->private_data; struct chlg_registered_dev *dev; - struct obd_device *obd; - struct task_struct *task; - int rc; ENTRY; - dev = container_of(misc, struct chlg_registered_dev, ced_misc); - obd = list_first_entry(&dev->ced_obds, - struct obd_device, - u.cli.cl_chg_dev_linkage); + dev = container_of(inode->i_cdev, struct chlg_registered_dev, ced_cdev); OBD_ALLOC_PTR(crs); if (!crs) RETURN(-ENOMEM); - crs->crs_obd = obd; + kref_get(&dev->ced_refs); + crs->crs_ced = dev; crs->crs_err = false; crs->crs_eof = false; @@ -517,24 +645,10 @@ static int chlg_open(struct inode *inode, struct file *file) INIT_LIST_HEAD(&crs->crs_rec_queue); init_waitqueue_head(&crs->crs_waitq_prod); init_waitqueue_head(&crs->crs_waitq_cons); - - if (file->f_mode & FMODE_READ) { - task = kthread_run(chlg_load, crs, "chlg_load_thread"); - if (IS_ERR(task)) { - rc = PTR_ERR(task); - CERROR("%s: cannot start changelog thread: rc = %d\n", - obd->obd_name, rc); - GOTO(err_crs, rc); - } - crs->crs_prod_task = task; - } + crs->crs_prod_task = NULL; file->private_data = crs; RETURN(0); - -err_crs: - OBD_FREE_PTR(crs); - return rc; } /** @@ -557,6 +671,7 @@ static int chlg_release(struct inode *inode, struct file *file) list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) enq_record_delete(rec); + kref_put(&crs->crs_ced->ced_refs, chlg_dev_clear); OBD_FREE_PTR(crs); return rc; @@ -574,6 +689,11 @@ static unsigned int chlg_poll(struct file *file, poll_table *wait) { struct chlg_reader_state *crs = file->private_data; unsigned int mask = 0; + int rc; + + rc = chlg_start_thread(file); + if (rc) + RETURN(rc); mutex_lock(&crs->crs_lock); poll_wait(file, &crs->crs_waitq_cons, wait); @@ -619,11 +739,11 @@ static const struct file_operations chlg_fops = { * This uses obd_name of the form: "testfs-MDT0000-mdc-ffff88006501600" * and returns a name of the form: "changelog-testfs-MDT0000". */ -static void get_chlg_name(char *name, size_t name_len, struct obd_device *obd) +static void get_target_name(char *name, size_t name_len, struct obd_device *obd) { int i; - snprintf(name, name_len, "changelog-%s", obd->obd_name); + snprintf(name, name_len, "%s", obd->obd_name); /* Find the 2nd '-' from the end and truncate on it */ for (i = 0; i < 2; i++) { @@ -645,6 +765,7 @@ chlg_registered_dev_find_by_name(const char *name) { struct chlg_registered_dev *dit; + LASSERT(mutex_is_locked(&chlg_registered_dev_lock)); list_for_each_entry(dit, &chlg_registered_devices, ced_link) if (strcmp(name, dit->ced_name) == 0) return dit; @@ -663,6 +784,7 @@ chlg_registered_dev_find_by_obd(const struct obd_device *obd) struct chlg_registered_dev *dit; struct obd_device *oit; + LASSERT(mutex_is_locked(&chlg_registered_dev_lock)); list_for_each_entry(dit, &chlg_registered_devices, ced_link) list_for_each_entry(oit, &dit->ced_obds, u.cli.cl_chg_dev_linkage) @@ -683,18 +805,14 @@ int mdc_changelog_cdev_init(struct obd_device *obd) { struct chlg_registered_dev *exist; struct chlg_registered_dev *entry; - int rc; + int minor, rc; ENTRY; OBD_ALLOC_PTR(entry); if (entry == NULL) RETURN(-ENOMEM); - get_chlg_name(entry->ced_name, sizeof(entry->ced_name), obd); - - entry->ced_misc.minor = MISC_DYNAMIC_MINOR; - entry->ced_misc.name = entry->ced_name; - entry->ced_misc.fops = &chlg_fops; + get_target_name(entry->ced_name, sizeof(entry->ced_name), obd); kref_init(&entry->ced_refs); INIT_LIST_HEAD(&entry->ced_obds); @@ -711,15 +829,39 @@ int mdc_changelog_cdev_init(struct obd_device *obd) list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds); list_add_tail(&entry->ced_link, &chlg_registered_devices); + rc = chlg_minor_alloc(&minor); + if (rc) + GOTO(out_listrm, rc); + + device_initialize(&entry->ced_device); + entry->ced_device.devt = MKDEV(MAJOR(mdc_changelog_dev), minor); + entry->ced_device.class = mdc_changelog_class; + entry->ced_device.release = chlg_device_release; + dev_set_drvdata(&entry->ced_device, entry); + rc = dev_set_name(&entry->ced_device, "%s-%s", MDC_CHANGELOG_DEV_NAME, + entry->ced_name); + if (rc) + GOTO(out_minor, rc); + /* Register new character device */ - rc = misc_register(&entry->ced_misc); - if (rc != 0) { - list_del_init(&obd->u.cli.cl_chg_dev_linkage); - list_del(&entry->ced_link); - GOTO(out_unlock, rc); - } + cdev_init(&entry->ced_cdev, &chlg_fops); + entry->ced_cdev.owner = THIS_MODULE; + rc = cdev_device_add(&entry->ced_cdev, &entry->ced_device); + if (rc) + GOTO(out_device_name, rc); entry = NULL; /* prevent it from being freed below */ + GOTO(out_unlock, rc = 0); + +out_device_name: + kfree_const(entry->ced_device.kobj.name); + +out_minor: + chlg_minor_free(minor); + +out_listrm: + list_del_init(&obd->u.cli.cl_chg_dev_linkage); + list_del(&entry->ced_link); out_unlock: mutex_unlock(&chlg_registered_dev_lock); @@ -729,30 +871,15 @@ out_unlock: } /** - * Deregister a changelog character device whose refcount has reached zero. - */ -static void chlg_dev_clear(struct kref *kref) -{ - struct chlg_registered_dev *entry = container_of(kref, - struct chlg_registered_dev, - ced_refs); - ENTRY; - - list_del(&entry->ced_link); - misc_deregister(&entry->ced_misc); - OBD_FREE_PTR(entry); - EXIT; -} - -/** * Release OBD, decrease reference count of the corresponding changelog device. */ void mdc_changelog_cdev_finish(struct obd_device *obd) { - struct chlg_registered_dev *dev = chlg_registered_dev_find_by_obd(obd); - ENTRY; + struct chlg_registered_dev *dev; + ENTRY; mutex_lock(&chlg_registered_dev_lock); + dev = chlg_registered_dev_find_by_obd(obd); list_del_init(&obd->u.cli.cl_chg_dev_linkage); kref_put(&dev->ced_refs, chlg_dev_clear); mutex_unlock(&chlg_registered_dev_lock);