Whamcloud - gitweb
853bee2d55dc060ab5cc8bc64dc4679f4f84fad7
[fs/lustre-release.git] / lustre / mdc / mdc_changelog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Commissariat a l'Energie Atomique et aux Energies
24  *                     Alternatives.
25  *
26  * Copyright (c) 2017, Intel Corporation.
27  *
28  * Author: Henri Doreau <henri.doreau@cea.fr>
29  */
30
31 #define DEBUG_SUBSYSTEM S_MDC
32
33 #include <linux/init.h>
34 #include <linux/kthread.h>
35 #include <linux/poll.h>
36 #include <linux/device.h>
37 #include <linux/cdev.h>
38 #include <linux/idr.h>
39
40 #include <lustre_log.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42
43 #include "mdc_internal.h"
44
45
46 /*
47  * -- Changelog delivery through character device --
48  */
49
50 /**
51  * Mutex to protect chlg_registered_devices below
52  */
53 static DEFINE_MUTEX(chlg_registered_dev_lock);
54
55 /**
56  * Global linked list of all registered devices (one per MDT).
57  */
58 static LIST_HEAD(chlg_registered_devices);
59
60
61 struct chlg_registered_dev {
62         /* Device name of the form "changelog-{MDTNAME}" */
63         char                     ced_name[32];
64         /* changelog char device */
65         struct cdev              ced_cdev;
66         struct device            ced_device;
67         /* OBDs referencing this device (multiple mount point) */
68         struct list_head         ced_obds;
69         /* Reference counter for proper deregistration */
70         struct kref              ced_refs;
71         /* Link within the global chlg_registered_devices */
72         struct list_head         ced_link;
73 };
74
75 struct chlg_reader_state {
76         /* Shortcut to the corresponding OBD device */
77         struct obd_device          *crs_obd;
78         /* the corresponding chlg_registered_dev */
79         struct chlg_registered_dev *crs_ced;
80         /* Producer thread (if any) */
81         struct task_struct         *crs_prod_task;
82         /* An error occurred that prevents from reading further */
83         int                         crs_err;
84         /* EOF, no more records available */
85         bool                        crs_eof;
86         /* Desired start position */
87         __u64                       crs_start_offset;
88         /* Wait queue for the catalog processing thread */
89         wait_queue_head_t           crs_waitq_prod;
90         /* Wait queue for the record copy threads */
91         wait_queue_head_t           crs_waitq_cons;
92         /* Mutex protecting crs_rec_count and crs_rec_queue */
93         struct mutex                crs_lock;
94         /* Number of item in the list */
95         __u64                       crs_rec_count;
96         /* List of prefetched enqueued_record::enq_linkage_items */
97         struct list_head            crs_rec_queue;
98         unsigned int                crs_last_catidx;
99         unsigned int                crs_last_idx;
100         bool                        crs_poll;
101 };
102
103 struct chlg_rec_entry {
104         /* Link within the chlg_reader_state::crs_rec_queue list */
105         struct list_head        enq_linkage;
106         /* Data (enq_record) field length */
107         __u64                   enq_length;
108         /* Copy of a changelog record (see struct llog_changelog_rec) */
109         struct changelog_rec    enq_record[];
110 };
111
112 enum {
113         /* Number of records to prefetch locally. */
114         CDEV_CHLG_MAX_PREFETCH = 1024,
115 };
116
117 DEFINE_IDR(mdc_changelog_minor_idr);
118 static DEFINE_SPINLOCK(chlg_minor_lock);
119
120 static int chlg_minor_alloc(int *pminor)
121 {
122         void *minor_allocated = (void *)-1;
123         int minor;
124
125         idr_preload(GFP_KERNEL);
126         spin_lock(&chlg_minor_lock);
127         minor = idr_alloc(&mdc_changelog_minor_idr, minor_allocated, 0,
128                           MDC_CHANGELOG_DEV_COUNT, GFP_NOWAIT);
129         spin_unlock(&chlg_minor_lock);
130         idr_preload_end();
131
132         if (minor < 0)
133                 return minor;
134
135         *pminor = minor;
136         return 0;
137 }
138
139 static void chlg_minor_free(int minor)
140 {
141         spin_lock(&chlg_minor_lock);
142         idr_remove(&mdc_changelog_minor_idr, minor);
143         spin_unlock(&chlg_minor_lock);
144 }
145
146 static void chlg_device_release(struct device *dev)
147 {
148         struct chlg_registered_dev *entry = dev_get_drvdata(dev);
149
150         chlg_minor_free(MINOR(entry->ced_cdev.dev));
151         OBD_FREE_PTR(entry);
152 }
153
154 /**
155  * Deregister a changelog character device whose refcount has reached zero.
156  */
157 static void chlg_dev_clear(struct kref *kref)
158 {
159         struct chlg_registered_dev *entry;
160
161         ENTRY;
162         entry = container_of(kref, struct chlg_registered_dev,
163                              ced_refs);
164
165         list_del(&entry->ced_link);
166         cdev_device_del(&entry->ced_cdev, &entry->ced_device);
167         put_device(&entry->ced_device);
168         EXIT;
169 }
170
171 static inline struct obd_device* chlg_obd_get(struct chlg_registered_dev *dev)
172 {
173         struct obd_device *obd;
174
175         mutex_lock(&chlg_registered_dev_lock);
176         if (list_empty(&dev->ced_obds))
177                 return NULL;
178
179         obd = list_first_entry(&dev->ced_obds, struct obd_device,
180                                u.cli.cl_chg_dev_linkage);
181         class_incref(obd, "changelog", dev);
182         mutex_unlock(&chlg_registered_dev_lock);
183         return obd;
184 }
185
186 static inline void chlg_obd_put(struct chlg_registered_dev *dev,
187                          struct obd_device *obd)
188 {
189         class_decref(obd, "changelog", dev);
190 }
191
192 /**
193  * ChangeLog catalog processing callback invoked on each record.
194  * If the current record is eligible to userland delivery, push
195  * it into the crs_rec_queue where the consumer code will fetch it.
196  *
197  * @param[in]     env  (unused)
198  * @param[in]     llh  Client-side handle used to identify the llog
199  * @param[in]     hdr  Header of the current llog record
200  * @param[in,out] data chlg_reader_state passed from caller
201  *
202  * @return 0 or LLOG_PROC_* control code on success, negated error on failure.
203  */
204 static int chlg_read_cat_process_cb(const struct lu_env *env,
205                                     struct llog_handle *llh,
206                                     struct llog_rec_hdr *hdr, void *data)
207 {
208         struct llog_changelog_rec *rec;
209         struct chlg_reader_state *crs = data;
210         struct chlg_rec_entry *enq;
211         size_t len;
212         int rc;
213         ENTRY;
214
215         LASSERT(crs != NULL);
216         LASSERT(hdr != NULL);
217
218         rec = container_of(hdr, struct llog_changelog_rec, cr_hdr);
219
220         crs->crs_last_catidx = llh->lgh_hdr->llh_cat_idx;
221         crs->crs_last_idx = hdr->lrh_index;
222
223         if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
224                 rc = -EINVAL;
225                 CERROR("%s: not a changelog rec %x/%d in llog : rc = %d\n",
226                        crs->crs_obd->obd_name, rec->cr_hdr.lrh_type,
227                        rec->cr.cr_type, rc);
228                 RETURN(rc);
229         }
230
231         /* Skip undesired records */
232         if (rec->cr.cr_index < crs->crs_start_offset)
233                 RETURN(0);
234
235         CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n",
236                rec->cr.cr_index, rec->cr.cr_type,
237                changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
238                rec->cr.cr_flags & CLF_FLAGMASK,
239                PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
240                rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
241
242         wait_event_interruptible(crs->crs_waitq_prod,
243                                  crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH ||
244                                  kthread_should_stop());
245
246         if (kthread_should_stop())
247                 RETURN(LLOG_PROC_BREAK);
248
249         len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
250         OBD_ALLOC(enq, sizeof(*enq) + len);
251         if (enq == NULL)
252                 RETURN(-ENOMEM);
253
254         INIT_LIST_HEAD(&enq->enq_linkage);
255         enq->enq_length = len;
256         memcpy(enq->enq_record, &rec->cr, len);
257
258         mutex_lock(&crs->crs_lock);
259         list_add_tail(&enq->enq_linkage, &crs->crs_rec_queue);
260         crs->crs_rec_count++;
261         mutex_unlock(&crs->crs_lock);
262
263         wake_up(&crs->crs_waitq_cons);
264
265         RETURN(0);
266 }
267
268 /**
269  * Remove record from the list it is attached to and free it.
270  */
271 static void enq_record_delete(struct chlg_rec_entry *rec)
272 {
273         list_del(&rec->enq_linkage);
274         OBD_FREE(rec, sizeof(*rec) + rec->enq_length);
275 }
276
277 /**
278  * Record prefetch thread entry point. Opens the changelog catalog and starts
279  * reading records.
280  *
281  * @param[in,out]  args  chlg_reader_state passed from caller.
282  * @return 0 on success, negated error code on failure.
283  */
284 static int chlg_load(void *args)
285 {
286         struct chlg_reader_state *crs = args;
287         struct chlg_registered_dev *ced = crs->crs_ced;
288         struct obd_device *obd = NULL;
289         struct llog_ctxt *ctx = NULL;
290         struct llog_handle *llh = NULL;
291         int rc;
292         ENTRY;
293
294         crs->crs_last_catidx = 0;
295         crs->crs_last_idx = 0;
296
297 again:
298         obd = chlg_obd_get(ced);
299         if (obd == NULL)
300                 RETURN(-ENODEV);
301
302         crs->crs_obd = obd;
303
304         ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
305         if (ctx == NULL)
306                 GOTO(err_out, rc = -ENOENT);
307
308         rc = llog_open(NULL, ctx, &llh, NULL, CHANGELOG_CATALOG,
309                        LLOG_OPEN_EXISTS);
310         if (rc) {
311                 CERROR("%s: fail to open changelog catalog: rc = %d\n",
312                        obd->obd_name, rc);
313                 GOTO(err_out, rc);
314         }
315
316
317         rc = llog_init_handle(NULL, llh,
318                               LLOG_F_IS_CAT |
319                               LLOG_F_EXT_JOBID |
320                               LLOG_F_EXT_EXTRA_FLAGS |
321                               LLOG_F_EXT_X_UIDGID |
322                               LLOG_F_EXT_X_NID |
323                               LLOG_F_EXT_X_OMODE |
324                               LLOG_F_EXT_X_XATTR,
325                               NULL);
326         if (rc) {
327                 CERROR("%s: fail to init llog handle: rc = %d\n",
328                        obd->obd_name, rc);
329                 GOTO(err_out, rc);
330         }
331
332         rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs,
333                                 crs->crs_last_catidx, crs->crs_last_idx);
334         if (rc < 0) {
335                 CERROR("%s: fail to process llog: rc = %d\n", obd->obd_name, rc);
336                 GOTO(err_out, rc);
337         }
338         if (!kthread_should_stop() && crs->crs_poll) {
339                 llog_cat_close(NULL, llh);
340                 llog_ctxt_put(ctx);
341                 class_decref(obd, "changelog", crs);
342                 schedule_timeout_interruptible(cfs_time_seconds(1));
343                 goto again;
344         }
345
346         crs->crs_eof = true;
347
348 err_out:
349         if (rc < 0)
350                 crs->crs_err = rc;
351
352         wake_up(&crs->crs_waitq_cons);
353
354         if (llh != NULL)
355                 llog_cat_close(NULL, llh);
356
357         if (ctx != NULL)
358                 llog_ctxt_put(ctx);
359
360         crs->crs_obd = NULL;
361         chlg_obd_put(ced, obd);
362         wait_event_interruptible(crs->crs_waitq_prod, kthread_should_stop());
363
364         RETURN(rc);
365 }
366
367 static int chlg_start_thread(struct file *file)
368 {
369         struct chlg_reader_state *crs = file->private_data;
370         struct task_struct *task;
371         int rc = 0;
372
373         if (likely(crs->crs_prod_task))
374                 return 0;
375         if (unlikely(file->f_mode & FMODE_READ) == 0)
376                 return 0;
377
378         mutex_lock(&crs->crs_lock);
379         if (crs->crs_prod_task == NULL) {
380                 task = kthread_run(chlg_load, crs, "chlg_load_thread");
381                 if (IS_ERR(task)) {
382                         rc = PTR_ERR(task);
383                         CERROR("%s: cannot start changelog thread: rc = %d\n",
384                                crs->crs_ced->ced_name, rc);
385                         GOTO(out, rc);
386                 }
387                 crs->crs_prod_task = task;
388         }
389 out:
390         mutex_unlock(&crs->crs_lock);
391         return rc;
392 }
393
394 /**
395  * Read handler, dequeues records from the chlg_reader_state if any.
396  * No partial records are copied to userland so this function can return less
397  * data than required (short read).
398  *
399  * @param[in]   file   File pointer to the character device.
400  * @param[out]  buff   Userland buffer where to copy the records.
401  * @param[in]   count  Userland buffer size.
402  * @param[out]  ppos   File position, updated with the index number of the next
403  *                     record to read.
404  * @return number of copied bytes on success, negated error code on failure.
405  */
406 static ssize_t chlg_read(struct file *file, char __user *buff, size_t count,
407                          loff_t *ppos)
408 {
409         struct chlg_reader_state *crs = file->private_data;
410         struct chlg_rec_entry *rec;
411         struct chlg_rec_entry *tmp;
412         size_t written_total = 0;
413         ssize_t rc;
414         LIST_HEAD(consumed);
415         ENTRY;
416
417         if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0) {
418                 if (crs->crs_err < 0)
419                         RETURN(crs->crs_err);
420                 else if (crs->crs_eof)
421                         RETURN(0);
422                 else
423                         RETURN(-EAGAIN);
424         }
425
426         rc = chlg_start_thread(file);
427         if (rc)
428                 RETURN(rc);
429
430         rc = wait_event_interruptible(crs->crs_waitq_cons,
431                         crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err);
432
433         mutex_lock(&crs->crs_lock);
434         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
435                 if (written_total + rec->enq_length > count)
436                         break;
437
438                 if (copy_to_user(buff, rec->enq_record, rec->enq_length)) {
439                         rc = -EFAULT;
440                         break;
441                 }
442
443                 buff += rec->enq_length;
444                 written_total += rec->enq_length;
445
446                 crs->crs_rec_count--;
447                 list_move_tail(&rec->enq_linkage, &consumed);
448
449                 crs->crs_start_offset = rec->enq_record->cr_index + 1;
450         }
451         mutex_unlock(&crs->crs_lock);
452
453         if (written_total > 0) {
454                 rc = written_total;
455                 wake_up(&crs->crs_waitq_prod);
456         } else if (rc == 0) {
457                 rc = crs->crs_err;
458         }
459
460         list_for_each_entry_safe(rec, tmp, &consumed, enq_linkage)
461                 enq_record_delete(rec);
462
463         *ppos = crs->crs_start_offset;
464
465         RETURN(rc);
466 }
467
468 /**
469  * Jump to a given record index. Helper for chlg_llseek().
470  *
471  * @param[in,out]  crs     Internal reader state.
472  * @param[in]      offset  Desired offset (index record).
473  * @return 0 on success, negated error code on failure.
474  */
475 static int chlg_set_start_offset(struct chlg_reader_state *crs, __u64 offset)
476 {
477         struct chlg_rec_entry *rec;
478         struct chlg_rec_entry *tmp;
479
480         mutex_lock(&crs->crs_lock);
481         if (offset < crs->crs_start_offset) {
482                 mutex_unlock(&crs->crs_lock);
483                 return -ERANGE;
484         }
485
486         crs->crs_start_offset = offset;
487         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
488                 struct changelog_rec *cr = rec->enq_record;
489
490                 if (cr->cr_index >= crs->crs_start_offset)
491                         break;
492
493                 crs->crs_rec_count--;
494                 enq_record_delete(rec);
495         }
496
497         mutex_unlock(&crs->crs_lock);
498         wake_up(&crs->crs_waitq_prod);
499         return 0;
500 }
501
502 /**
503  * Move read pointer to a certain record index, encoded as an offset.
504  *
505  * @param[in,out] file   File pointer to the changelog character device
506  * @param[in]     off    Offset to skip, actually a record index, not byte count
507  * @param[in]     whence Relative/Absolute interpretation of the offset
508  * @return the resulting position on success or negated error code on failure.
509  */
510 static loff_t chlg_llseek(struct file *file, loff_t off, int whence)
511 {
512         struct chlg_reader_state *crs = file->private_data;
513         loff_t pos;
514         int rc;
515
516         switch (whence) {
517         case SEEK_SET:
518                 pos = off;
519                 break;
520         case SEEK_CUR:
521                 pos = file->f_pos + off;
522                 break;
523         case SEEK_END:
524         default:
525                 return -EINVAL;
526         }
527
528         /* We cannot go backward */
529         if (pos < file->f_pos)
530                 return -EINVAL;
531
532         rc = chlg_set_start_offset(crs, pos);
533         if (rc != 0)
534                 return rc;
535
536         file->f_pos = pos;
537         return pos;
538 }
539
540 /**
541  * Clear record range for a given changelog reader.
542  *
543  * @param[in]  crs     Current internal state.
544  * @param[in]  reader  Changelog reader ID (cl1, cl2...)
545  * @param[in]  record  Record index up which to clear
546  * @return 0 on success, negated error code on failure.
547  */
548 static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record)
549 {
550         struct obd_device *obd = NULL;
551         struct changelog_setinfo cs  = {
552                 .cs_recno = record,
553                 .cs_id    = reader
554         };
555         int rc;
556
557         obd = chlg_obd_get(crs->crs_ced);
558         if (obd == NULL)
559                 return -ENODEV;
560
561         rc = obd_set_info_async(NULL, obd->obd_self_export,
562                                 strlen(KEY_CHANGELOG_CLEAR),
563                                 KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL);
564
565         chlg_obd_put(crs->crs_ced, obd);
566         return rc;
567 }
568
569 /** Maximum changelog control command size */
570 #define CHLG_CONTROL_CMD_MAX    64
571
572 /**
573  * Handle writes() into the changelog character device. Write() can be used
574  * to request special control operations.
575  *
576  * @param[in]  file  File pointer to the changelog character device
577  * @param[in]  buff  User supplied data (written data)
578  * @param[in]  count Number of written bytes
579  * @param[in]  off   (unused)
580  * @return number of written bytes on success, negated error code on failure.
581  */
582 static ssize_t chlg_write(struct file *file, const char __user *buff,
583                           size_t count, loff_t *off)
584 {
585         struct chlg_reader_state *crs = file->private_data;
586         char *kbuf;
587         __u64 record;
588         __u32 reader;
589         int rc = 0;
590         ENTRY;
591
592         if (count > CHLG_CONTROL_CMD_MAX)
593                 RETURN(-EINVAL);
594
595         OBD_ALLOC(kbuf, CHLG_CONTROL_CMD_MAX);
596         if (kbuf == NULL)
597                 RETURN(-ENOMEM);
598
599         if (copy_from_user(kbuf, buff, count))
600                 GOTO(out_kbuf, rc = -EFAULT);
601
602         kbuf[CHLG_CONTROL_CMD_MAX - 1] = '\0';
603
604         if (sscanf(kbuf, "clear:cl%u:%llu", &reader, &record) == 2)
605                 rc = chlg_clear(crs, reader, record);
606         else
607                 rc = -EINVAL;
608
609         EXIT;
610 out_kbuf:
611         OBD_FREE(kbuf, CHLG_CONTROL_CMD_MAX);
612         return rc < 0 ? rc : count;
613 }
614
615 /**
616  * Open handler, initialize internal CRS state and spawn prefetch thread if
617  * needed.
618  * @param[in]  inode  Inode struct for the open character device.
619  * @param[in]  file   Corresponding file pointer.
620  * @return 0 on success, negated error code on failure.
621  */
622 static int chlg_open(struct inode *inode, struct file *file)
623 {
624         struct chlg_reader_state *crs;
625         struct chlg_registered_dev *dev;
626         ENTRY;
627
628         dev = container_of(inode->i_cdev, struct chlg_registered_dev, ced_cdev);
629
630         OBD_ALLOC_PTR(crs);
631         if (!crs)
632                 RETURN(-ENOMEM);
633
634         kref_get(&dev->ced_refs);
635         crs->crs_ced = dev;
636         crs->crs_err = false;
637         crs->crs_eof = false;
638
639         mutex_init(&crs->crs_lock);
640         INIT_LIST_HEAD(&crs->crs_rec_queue);
641         init_waitqueue_head(&crs->crs_waitq_prod);
642         init_waitqueue_head(&crs->crs_waitq_cons);
643         crs->crs_prod_task = NULL;
644
645         file->private_data = crs;
646         RETURN(0);
647 }
648
649 /**
650  * Close handler, release resources.
651  *
652  * @param[in]  inode  Inode struct for the open character device.
653  * @param[in]  file   Corresponding file pointer.
654  * @return 0 on success, negated error code on failure.
655  */
656 static int chlg_release(struct inode *inode, struct file *file)
657 {
658         struct chlg_reader_state *crs = file->private_data;
659         struct chlg_rec_entry *rec;
660         struct chlg_rec_entry *tmp;
661         int rc = 0;
662
663         if (crs->crs_prod_task)
664                 rc = kthread_stop(crs->crs_prod_task);
665
666         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage)
667                 enq_record_delete(rec);
668
669         kref_put(&crs->crs_ced->ced_refs, chlg_dev_clear);
670         OBD_FREE_PTR(crs);
671
672         return rc;
673 }
674
675 /**
676  * Poll handler, indicates whether the device is readable (new records) and
677  * writable (always).
678  *
679  * @param[in]  file   Device file pointer.
680  * @param[in]  wait   (opaque)
681  * @return combination of the poll status flags.
682  */
683 static unsigned int chlg_poll(struct file *file, poll_table *wait)
684 {
685         struct chlg_reader_state *crs = file->private_data;
686         unsigned int mask = 0;
687         int rc;
688
689         rc = chlg_start_thread(file);
690         if (rc)
691                 RETURN(rc);
692
693         mutex_lock(&crs->crs_lock);
694         poll_wait(file, &crs->crs_waitq_cons, wait);
695         if (crs->crs_rec_count > 0)
696                 mask |= POLLIN | POLLRDNORM;
697         if (crs->crs_err)
698                 mask |= POLLERR;
699         if (crs->crs_eof)
700                 mask |= POLLHUP;
701         mutex_unlock(&crs->crs_lock);
702         return mask;
703 }
704
705 static long chlg_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
706 {
707         int rc;
708
709         struct chlg_reader_state *crs = file->private_data;
710         switch (cmd) {
711         case OBD_IOC_CHLG_POLL:
712                 crs->crs_poll = !!arg;
713                 rc = 0;
714                 break;
715         default:
716                 rc = -EINVAL;
717                 break;
718         }
719         return rc;
720 }
721
722 static const struct file_operations chlg_fops = {
723         .owner          = THIS_MODULE,
724         .llseek         = chlg_llseek,
725         .read           = chlg_read,
726         .write          = chlg_write,
727         .open           = chlg_open,
728         .release        = chlg_release,
729         .poll           = chlg_poll,
730         .unlocked_ioctl = chlg_ioctl,
731 };
732
733 /**
734  * This uses obd_name of the form: "testfs-MDT0000-mdc-ffff88006501600"
735  * and returns a name of the form: "changelog-testfs-MDT0000".
736  */
737 static void get_target_name(char *name, size_t name_len, struct obd_device *obd)
738 {
739         int i;
740
741         snprintf(name, name_len, "%s", obd->obd_name);
742
743         /* Find the 2nd '-' from the end and truncate on it */
744         for (i = 0; i < 2; i++) {
745                 char *p = strrchr(name, '-');
746
747                 if (p == NULL)
748                         return;
749                 *p = '\0';
750         }
751 }
752
753 /**
754  * Find a changelog character device by name.
755  * All devices registered during MDC setup are listed in a global list with
756  * their names attached.
757  */
758 static struct chlg_registered_dev *
759 chlg_registered_dev_find_by_name(const char *name)
760 {
761         struct chlg_registered_dev *dit;
762
763         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
764         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
765                 if (strcmp(name, dit->ced_name) == 0)
766                         return dit;
767         return NULL;
768 }
769
770 /**
771  * Find chlg_registered_dev structure for a given OBD device.
772  * This is bad O(n^2) but for each filesystem:
773  *   - N is # of MDTs times # of mount points
774  *   - this only runs at shutdown
775  */
776 static struct chlg_registered_dev *
777 chlg_registered_dev_find_by_obd(const struct obd_device *obd)
778 {
779         struct chlg_registered_dev *dit;
780         struct obd_device *oit;
781
782         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
783         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
784                 list_for_each_entry(oit, &dit->ced_obds,
785                                     u.cli.cl_chg_dev_linkage)
786                         if (oit == obd)
787                                 return dit;
788         return NULL;
789 }
790
791 /**
792  * Changelog character device initialization.
793  * Register a misc character device with a dynamic minor number, under a name
794  * of the form: 'changelog-fsname-MDTxxxx'. Reference this OBD device with it.
795  *
796  * @param[in] obd  This MDC obd_device.
797  * @return 0 on success, negated error code on failure.
798  */
799 int mdc_changelog_cdev_init(struct obd_device *obd)
800 {
801         struct chlg_registered_dev *exist;
802         struct chlg_registered_dev *entry;
803         int minor, rc;
804         ENTRY;
805
806         OBD_ALLOC_PTR(entry);
807         if (entry == NULL)
808                 RETURN(-ENOMEM);
809
810         get_target_name(entry->ced_name, sizeof(entry->ced_name), obd);
811
812         kref_init(&entry->ced_refs);
813         INIT_LIST_HEAD(&entry->ced_obds);
814         INIT_LIST_HEAD(&entry->ced_link);
815
816         mutex_lock(&chlg_registered_dev_lock);
817         exist = chlg_registered_dev_find_by_name(entry->ced_name);
818         if (exist != NULL) {
819                 kref_get(&exist->ced_refs);
820                 list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &exist->ced_obds);
821                 GOTO(out_unlock, rc = 0);
822         }
823
824         list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds);
825         list_add_tail(&entry->ced_link, &chlg_registered_devices);
826
827         rc = chlg_minor_alloc(&minor);
828         if (rc)
829                 GOTO(out_listrm, rc);
830
831         device_initialize(&entry->ced_device);
832         entry->ced_device.devt = MKDEV(MAJOR(mdc_changelog_dev), minor);
833         entry->ced_device.class = mdc_changelog_class;
834         entry->ced_device.release = chlg_device_release;
835         dev_set_drvdata(&entry->ced_device, entry);
836         rc = dev_set_name(&entry->ced_device, "%s-%s", MDC_CHANGELOG_DEV_NAME,
837                           entry->ced_name);
838         if (rc)
839                 GOTO(out_minor, rc);
840
841         /* Register new character device */
842         cdev_init(&entry->ced_cdev, &chlg_fops);
843         entry->ced_cdev.owner = THIS_MODULE;
844         rc = cdev_device_add(&entry->ced_cdev, &entry->ced_device);
845         if (rc)
846                 GOTO(out_device_name, rc);
847
848         entry = NULL;   /* prevent it from being freed below */
849         GOTO(out_unlock, rc = 0);
850
851 out_device_name:
852         kfree_const(entry->ced_device.kobj.name);
853
854 out_minor:
855         chlg_minor_free(minor);
856
857 out_listrm:
858         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
859         list_del(&entry->ced_link);
860
861 out_unlock:
862         mutex_unlock(&chlg_registered_dev_lock);
863         if (entry)
864                 OBD_FREE_PTR(entry);
865         RETURN(rc);
866 }
867
868 /**
869  * Release OBD, decrease reference count of the corresponding changelog device.
870  */
871 void mdc_changelog_cdev_finish(struct obd_device *obd)
872 {
873         struct chlg_registered_dev *dev;
874
875         ENTRY;
876         mutex_lock(&chlg_registered_dev_lock);
877         dev = chlg_registered_dev_find_by_obd(obd);
878         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
879         kref_put(&dev->ced_refs, chlg_dev_clear);
880         mutex_unlock(&chlg_registered_dev_lock);
881         EXIT;
882 }