Whamcloud - gitweb
LU-6142 mdc: include linux/idr.h for referenced code
[fs/lustre-release.git] / lustre / mdc / mdc_changelog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Commissariat a l'Energie Atomique et aux Energies
24  *                     Alternatives.
25  *
26  * Copyright (c) 2017, Intel Corporation.
27  *
28  * Author: Henri Doreau <henri.doreau@cea.fr>
29  */
30
31 #define DEBUG_SUBSYSTEM S_MDC
32
33 #include <linux/init.h>
34 #include <linux/kthread.h>
35 #include <linux/poll.h>
36 #include <linux/device.h>
37 #include <linux/cdev.h>
38 #include <linux/idr.h>
39
40 #include <lustre_log.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42
43 #include "mdc_internal.h"
44
45
46 /*
47  * -- Changelog delivery through character device --
48  */
49
50 /**
51  * Mutex to protect chlg_registered_devices below
52  */
53 static DEFINE_MUTEX(chlg_registered_dev_lock);
54
55 /**
56  * Global linked list of all registered devices (one per MDT).
57  */
58 static LIST_HEAD(chlg_registered_devices);
59
60
61 struct chlg_registered_dev {
62         /* Device name of the form "changelog-{MDTNAME}" */
63         char                     ced_name[32];
64         /* changelog char device */
65         struct cdev              ced_cdev;
66         struct device            ced_device;
67         /* OBDs referencing this device (multiple mount point) */
68         struct list_head         ced_obds;
69         /* Reference counter for proper deregistration */
70         struct kref              ced_refs;
71         /* Link within the global chlg_registered_devices */
72         struct list_head         ced_link;
73 };
74
75 struct chlg_reader_state {
76         /* Shortcut to the corresponding OBD device */
77         struct obd_device          *crs_obd;
78         /* the corresponding chlg_registered_dev */
79         struct chlg_registered_dev *crs_ced;
80         /* Producer thread (if any) */
81         struct task_struct         *crs_prod_task;
82         /* An error occurred that prevents from reading further */
83         int                         crs_err;
84         /* EOF, no more records available */
85         bool                        crs_eof;
86         /* Desired start position */
87         __u64                       crs_start_offset;
88         /* Wait queue for the catalog processing thread */
89         wait_queue_head_t           crs_waitq_prod;
90         /* Wait queue for the record copy threads */
91         wait_queue_head_t           crs_waitq_cons;
92         /* Mutex protecting crs_rec_count and crs_rec_queue */
93         struct mutex                crs_lock;
94         /* Number of item in the list */
95         __u64                       crs_rec_count;
96         /* List of prefetched enqueued_record::enq_linkage_items */
97         struct list_head            crs_rec_queue;
98         unsigned int                crs_last_catidx;
99         unsigned int                crs_last_idx;
100         bool                        crs_poll;
101 };
102
103 struct chlg_rec_entry {
104         /* Link within the chlg_reader_state::crs_rec_queue list */
105         struct list_head        enq_linkage;
106         /* Data (enq_record) field length */
107         __u64                   enq_length;
108         /* Copy of a changelog record (see struct llog_changelog_rec) */
109         struct changelog_rec    enq_record[];
110 };
111
112 enum {
113         /* Number of records to prefetch locally. */
114         CDEV_CHLG_MAX_PREFETCH = 1024,
115 };
116
117 DEFINE_IDR(mdc_changelog_minor_idr);
118 static DEFINE_SPINLOCK(chlg_minor_lock);
119
120 static int chlg_minor_alloc(int *pminor)
121 {
122         void *minor_allocated = (void *)-1;
123         int minor;
124
125         idr_preload(GFP_KERNEL);
126         spin_lock(&chlg_minor_lock);
127         minor = idr_alloc(&mdc_changelog_minor_idr, minor_allocated, 0,
128                           MDC_CHANGELOG_DEV_COUNT, GFP_NOWAIT);
129         spin_unlock(&chlg_minor_lock);
130         idr_preload_end();
131
132         if (minor < 0)
133                 return minor;
134
135         *pminor = minor;
136         return 0;
137 }
138
139 static void chlg_minor_free(int minor)
140 {
141         spin_lock(&chlg_minor_lock);
142         idr_remove(&mdc_changelog_minor_idr, minor);
143         spin_unlock(&chlg_minor_lock);
144 }
145
146 static void chlg_device_release(struct device *dev)
147 {
148         struct chlg_registered_dev *entry = dev_get_drvdata(dev);
149
150         chlg_minor_free(MINOR(entry->ced_cdev.dev));
151         OBD_FREE_PTR(entry);
152 }
153
154 /**
155  * Deregister a changelog character device whose refcount has reached zero.
156  */
157 static void chlg_dev_clear(struct kref *kref)
158 {
159         struct chlg_registered_dev *entry;
160
161         ENTRY;
162         entry = container_of(kref, struct chlg_registered_dev,
163                              ced_refs);
164
165         list_del(&entry->ced_link);
166         cdev_device_del(&entry->ced_cdev, &entry->ced_device);
167         put_device(&entry->ced_device);
168         EXIT;
169 }
170
171 static inline struct obd_device* chlg_obd_get(struct chlg_registered_dev *dev)
172 {
173         struct obd_device *obd;
174
175         mutex_lock(&chlg_registered_dev_lock);
176         if (list_empty(&dev->ced_obds))
177                 return NULL;
178
179         obd = list_first_entry(&dev->ced_obds, struct obd_device,
180                                u.cli.cl_chg_dev_linkage);
181         class_incref(obd, "changelog", dev);
182         mutex_unlock(&chlg_registered_dev_lock);
183         return obd;
184 }
185
186 static inline void chlg_obd_put(struct chlg_registered_dev *dev,
187                          struct obd_device *obd)
188 {
189         class_decref(obd, "changelog", dev);
190 }
191
192 /**
193  * ChangeLog catalog processing callback invoked on each record.
194  * If the current record is eligible to userland delivery, push
195  * it into the crs_rec_queue where the consumer code will fetch it.
196  *
197  * @param[in]     env  (unused)
198  * @param[in]     llh  Client-side handle used to identify the llog
199  * @param[in]     hdr  Header of the current llog record
200  * @param[in,out] data chlg_reader_state passed from caller
201  *
202  * @return 0 or LLOG_PROC_* control code on success, negated error on failure.
203  */
204 static int chlg_read_cat_process_cb(const struct lu_env *env,
205                                     struct llog_handle *llh,
206                                     struct llog_rec_hdr *hdr, void *data)
207 {
208         struct llog_changelog_rec *rec;
209         struct chlg_reader_state *crs = data;
210         struct chlg_rec_entry *enq;
211         size_t len;
212         int rc;
213         ENTRY;
214
215         LASSERT(crs != NULL);
216         LASSERT(hdr != NULL);
217
218         rec = container_of(hdr, struct llog_changelog_rec, cr_hdr);
219
220         crs->crs_last_catidx = llh->lgh_hdr->llh_cat_idx;
221         crs->crs_last_idx = hdr->lrh_index;
222
223         if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
224                 rc = -EINVAL;
225                 CERROR("%s: not a changelog rec %x/%d in llog "DFID" rc = %d\n",
226                        crs->crs_obd->obd_name, rec->cr_hdr.lrh_type,
227                        rec->cr.cr_type,
228                        PFID(lu_object_fid(&llh->lgh_obj->do_lu)), rc);
229                 RETURN(rc);
230         }
231
232         /* Skip undesired records */
233         if (rec->cr.cr_index < crs->crs_start_offset)
234                 RETURN(0);
235
236         CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n",
237                rec->cr.cr_index, rec->cr.cr_type,
238                changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
239                rec->cr.cr_flags & CLF_FLAGMASK,
240                PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
241                rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
242
243         wait_event_interruptible(crs->crs_waitq_prod,
244                                  crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH ||
245                                  kthread_should_stop());
246
247         if (kthread_should_stop())
248                 RETURN(LLOG_PROC_BREAK);
249
250         len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
251         OBD_ALLOC(enq, sizeof(*enq) + len);
252         if (enq == NULL)
253                 RETURN(-ENOMEM);
254
255         INIT_LIST_HEAD(&enq->enq_linkage);
256         enq->enq_length = len;
257         memcpy(enq->enq_record, &rec->cr, len);
258
259         mutex_lock(&crs->crs_lock);
260         list_add_tail(&enq->enq_linkage, &crs->crs_rec_queue);
261         crs->crs_rec_count++;
262         mutex_unlock(&crs->crs_lock);
263
264         wake_up(&crs->crs_waitq_cons);
265
266         RETURN(0);
267 }
268
269 /**
270  * Remove record from the list it is attached to and free it.
271  */
272 static void enq_record_delete(struct chlg_rec_entry *rec)
273 {
274         list_del(&rec->enq_linkage);
275         OBD_FREE(rec, sizeof(*rec) + rec->enq_length);
276 }
277
278 /**
279  * Record prefetch thread entry point. Opens the changelog catalog and starts
280  * reading records.
281  *
282  * @param[in,out]  args  chlg_reader_state passed from caller.
283  * @return 0 on success, negated error code on failure.
284  */
285 static int chlg_load(void *args)
286 {
287         struct chlg_reader_state *crs = args;
288         struct chlg_registered_dev *ced = crs->crs_ced;
289         struct obd_device *obd = NULL;
290         struct llog_ctxt *ctx = NULL;
291         struct llog_handle *llh = NULL;
292         int rc;
293         ENTRY;
294
295         crs->crs_last_catidx = 0;
296         crs->crs_last_idx = 0;
297
298 again:
299         obd = chlg_obd_get(ced);
300         if (obd == NULL)
301                 RETURN(-ENODEV);
302
303         crs->crs_obd = obd;
304
305         ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
306         if (ctx == NULL)
307                 GOTO(err_out, rc = -ENOENT);
308
309         rc = llog_open(NULL, ctx, &llh, NULL, CHANGELOG_CATALOG,
310                        LLOG_OPEN_EXISTS);
311         if (rc) {
312                 CERROR("%s: fail to open changelog catalog: rc = %d\n",
313                        obd->obd_name, rc);
314                 GOTO(err_out, rc);
315         }
316
317
318         rc = llog_init_handle(NULL, llh,
319                               LLOG_F_IS_CAT |
320                               LLOG_F_EXT_JOBID |
321                               LLOG_F_EXT_EXTRA_FLAGS |
322                               LLOG_F_EXT_X_UIDGID |
323                               LLOG_F_EXT_X_NID |
324                               LLOG_F_EXT_X_OMODE |
325                               LLOG_F_EXT_X_XATTR,
326                               NULL);
327         if (rc) {
328                 CERROR("%s: fail to init llog handle: rc = %d\n",
329                        obd->obd_name, rc);
330                 GOTO(err_out, rc);
331         }
332
333         rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs,
334                                 crs->crs_last_catidx, crs->crs_last_idx);
335         if (rc < 0) {
336                 CERROR("%s: fail to process llog: rc = %d\n", obd->obd_name, rc);
337                 GOTO(err_out, rc);
338         }
339         if (!kthread_should_stop() && crs->crs_poll) {
340                 llog_cat_close(NULL, llh);
341                 llog_ctxt_put(ctx);
342                 class_decref(obd, "changelog", crs);
343                 schedule_timeout_interruptible(cfs_time_seconds(1));
344                 goto again;
345         }
346
347         crs->crs_eof = true;
348
349 err_out:
350         if (rc < 0)
351                 crs->crs_err = rc;
352
353         wake_up(&crs->crs_waitq_cons);
354
355         if (llh != NULL)
356                 llog_cat_close(NULL, llh);
357
358         if (ctx != NULL)
359                 llog_ctxt_put(ctx);
360
361         crs->crs_obd = NULL;
362         chlg_obd_put(ced, obd);
363         wait_event_interruptible(crs->crs_waitq_prod, kthread_should_stop());
364
365         RETURN(rc);
366 }
367
368 /**
369  * Read handler, dequeues records from the chlg_reader_state if any.
370  * No partial records are copied to userland so this function can return less
371  * data than required (short read).
372  *
373  * @param[in]   file   File pointer to the character device.
374  * @param[out]  buff   Userland buffer where to copy the records.
375  * @param[in]   count  Userland buffer size.
376  * @param[out]  ppos   File position, updated with the index number of the next
377  *                     record to read.
378  * @return number of copied bytes on success, negated error code on failure.
379  */
380 static ssize_t chlg_read(struct file *file, char __user *buff, size_t count,
381                          loff_t *ppos)
382 {
383         struct chlg_reader_state *crs = file->private_data;
384         struct chlg_rec_entry *rec;
385         struct chlg_rec_entry *tmp;
386         size_t written_total = 0;
387         ssize_t rc;
388         LIST_HEAD(consumed);
389         ENTRY;
390
391         if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0) {
392                 if (crs->crs_err < 0)
393                         RETURN(crs->crs_err);
394                 else if (crs->crs_eof)
395                         RETURN(0);
396                 else
397                         RETURN(-EAGAIN);
398         }
399
400         rc = wait_event_interruptible(crs->crs_waitq_cons,
401                         crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err);
402
403         mutex_lock(&crs->crs_lock);
404         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
405                 if (written_total + rec->enq_length > count)
406                         break;
407
408                 if (copy_to_user(buff, rec->enq_record, rec->enq_length)) {
409                         rc = -EFAULT;
410                         break;
411                 }
412
413                 buff += rec->enq_length;
414                 written_total += rec->enq_length;
415
416                 crs->crs_rec_count--;
417                 list_move_tail(&rec->enq_linkage, &consumed);
418
419                 crs->crs_start_offset = rec->enq_record->cr_index + 1;
420         }
421         mutex_unlock(&crs->crs_lock);
422
423         if (written_total > 0) {
424                 rc = written_total;
425                 wake_up(&crs->crs_waitq_prod);
426         } else if (rc == 0) {
427                 rc = crs->crs_err;
428         }
429
430         list_for_each_entry_safe(rec, tmp, &consumed, enq_linkage)
431                 enq_record_delete(rec);
432
433         *ppos = crs->crs_start_offset;
434
435         RETURN(rc);
436 }
437
438 /**
439  * Jump to a given record index. Helper for chlg_llseek().
440  *
441  * @param[in,out]  crs     Internal reader state.
442  * @param[in]      offset  Desired offset (index record).
443  * @return 0 on success, negated error code on failure.
444  */
445 static int chlg_set_start_offset(struct chlg_reader_state *crs, __u64 offset)
446 {
447         struct chlg_rec_entry *rec;
448         struct chlg_rec_entry *tmp;
449
450         mutex_lock(&crs->crs_lock);
451         if (offset < crs->crs_start_offset) {
452                 mutex_unlock(&crs->crs_lock);
453                 return -ERANGE;
454         }
455
456         crs->crs_start_offset = offset;
457         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
458                 struct changelog_rec *cr = rec->enq_record;
459
460                 if (cr->cr_index >= crs->crs_start_offset)
461                         break;
462
463                 crs->crs_rec_count--;
464                 enq_record_delete(rec);
465         }
466
467         mutex_unlock(&crs->crs_lock);
468         wake_up(&crs->crs_waitq_prod);
469         return 0;
470 }
471
472 /**
473  * Move read pointer to a certain record index, encoded as an offset.
474  *
475  * @param[in,out] file   File pointer to the changelog character device
476  * @param[in]     off    Offset to skip, actually a record index, not byte count
477  * @param[in]     whence Relative/Absolute interpretation of the offset
478  * @return the resulting position on success or negated error code on failure.
479  */
480 static loff_t chlg_llseek(struct file *file, loff_t off, int whence)
481 {
482         struct chlg_reader_state *crs = file->private_data;
483         loff_t pos;
484         int rc;
485
486         switch (whence) {
487         case SEEK_SET:
488                 pos = off;
489                 break;
490         case SEEK_CUR:
491                 pos = file->f_pos + off;
492                 break;
493         case SEEK_END:
494         default:
495                 return -EINVAL;
496         }
497
498         /* We cannot go backward */
499         if (pos < file->f_pos)
500                 return -EINVAL;
501
502         rc = chlg_set_start_offset(crs, pos);
503         if (rc != 0)
504                 return rc;
505
506         file->f_pos = pos;
507         return pos;
508 }
509
510 /**
511  * Clear record range for a given changelog reader.
512  *
513  * @param[in]  crs     Current internal state.
514  * @param[in]  reader  Changelog reader ID (cl1, cl2...)
515  * @param[in]  record  Record index up which to clear
516  * @return 0 on success, negated error code on failure.
517  */
518 static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record)
519 {
520         struct obd_device *obd = NULL;
521         struct changelog_setinfo cs  = {
522                 .cs_recno = record,
523                 .cs_id    = reader
524         };
525         int rc;
526
527         obd = chlg_obd_get(crs->crs_ced);
528         if (obd == NULL)
529                 return -ENODEV;
530
531         rc = obd_set_info_async(NULL, obd->obd_self_export,
532                                 strlen(KEY_CHANGELOG_CLEAR),
533                                 KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL);
534
535         chlg_obd_put(crs->crs_ced, obd);
536         return rc;
537 }
538
539 /** Maximum changelog control command size */
540 #define CHLG_CONTROL_CMD_MAX    64
541
542 /**
543  * Handle writes() into the changelog character device. Write() can be used
544  * to request special control operations.
545  *
546  * @param[in]  file  File pointer to the changelog character device
547  * @param[in]  buff  User supplied data (written data)
548  * @param[in]  count Number of written bytes
549  * @param[in]  off   (unused)
550  * @return number of written bytes on success, negated error code on failure.
551  */
552 static ssize_t chlg_write(struct file *file, const char __user *buff,
553                           size_t count, loff_t *off)
554 {
555         struct chlg_reader_state *crs = file->private_data;
556         char *kbuf;
557         __u64 record;
558         __u32 reader;
559         int rc = 0;
560         ENTRY;
561
562         if (count > CHLG_CONTROL_CMD_MAX)
563                 RETURN(-EINVAL);
564
565         OBD_ALLOC(kbuf, CHLG_CONTROL_CMD_MAX);
566         if (kbuf == NULL)
567                 RETURN(-ENOMEM);
568
569         if (copy_from_user(kbuf, buff, count))
570                 GOTO(out_kbuf, rc = -EFAULT);
571
572         kbuf[CHLG_CONTROL_CMD_MAX - 1] = '\0';
573
574         if (sscanf(kbuf, "clear:cl%u:%llu", &reader, &record) == 2)
575                 rc = chlg_clear(crs, reader, record);
576         else
577                 rc = -EINVAL;
578
579         EXIT;
580 out_kbuf:
581         OBD_FREE(kbuf, CHLG_CONTROL_CMD_MAX);
582         return rc < 0 ? rc : count;
583 }
584
585 /**
586  * Open handler, initialize internal CRS state and spawn prefetch thread if
587  * needed.
588  * @param[in]  inode  Inode struct for the open character device.
589  * @param[in]  file   Corresponding file pointer.
590  * @return 0 on success, negated error code on failure.
591  */
592 static int chlg_open(struct inode *inode, struct file *file)
593 {
594         struct chlg_reader_state *crs;
595         struct chlg_registered_dev *dev;
596         struct task_struct *task;
597         int rc;
598         ENTRY;
599
600         dev = container_of(inode->i_cdev, struct chlg_registered_dev, ced_cdev);
601
602         OBD_ALLOC_PTR(crs);
603         if (!crs)
604                 RETURN(-ENOMEM);
605
606         kref_get(&dev->ced_refs);
607         crs->crs_ced = dev;
608         crs->crs_err = false;
609         crs->crs_eof = false;
610
611         mutex_init(&crs->crs_lock);
612         INIT_LIST_HEAD(&crs->crs_rec_queue);
613         init_waitqueue_head(&crs->crs_waitq_prod);
614         init_waitqueue_head(&crs->crs_waitq_cons);
615
616         if (file->f_mode & FMODE_READ) {
617                 task = kthread_run(chlg_load, crs, "chlg_load_thread");
618                 if (IS_ERR(task)) {
619                         rc = PTR_ERR(task);
620                         CERROR("%s: cannot start changelog thread: rc = %d\n",
621                                dev->ced_name, rc);
622                         GOTO(err_crs, rc);
623                 }
624                 crs->crs_prod_task = task;
625         }
626
627         file->private_data = crs;
628         RETURN(0);
629
630 err_crs:
631         kref_put(&dev->ced_refs, chlg_dev_clear);
632         OBD_FREE_PTR(crs);
633         return rc;
634 }
635
636 /**
637  * Close handler, release resources.
638  *
639  * @param[in]  inode  Inode struct for the open character device.
640  * @param[in]  file   Corresponding file pointer.
641  * @return 0 on success, negated error code on failure.
642  */
643 static int chlg_release(struct inode *inode, struct file *file)
644 {
645         struct chlg_reader_state *crs = file->private_data;
646         struct chlg_rec_entry *rec;
647         struct chlg_rec_entry *tmp;
648         int rc = 0;
649
650         if (crs->crs_prod_task)
651                 rc = kthread_stop(crs->crs_prod_task);
652
653         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage)
654                 enq_record_delete(rec);
655
656         kref_put(&crs->crs_ced->ced_refs, chlg_dev_clear);
657         OBD_FREE_PTR(crs);
658
659         return rc;
660 }
661
662 /**
663  * Poll handler, indicates whether the device is readable (new records) and
664  * writable (always).
665  *
666  * @param[in]  file   Device file pointer.
667  * @param[in]  wait   (opaque)
668  * @return combination of the poll status flags.
669  */
670 static unsigned int chlg_poll(struct file *file, poll_table *wait)
671 {
672         struct chlg_reader_state *crs = file->private_data;
673         unsigned int mask = 0;
674
675         mutex_lock(&crs->crs_lock);
676         poll_wait(file, &crs->crs_waitq_cons, wait);
677         if (crs->crs_rec_count > 0)
678                 mask |= POLLIN | POLLRDNORM;
679         if (crs->crs_err)
680                 mask |= POLLERR;
681         if (crs->crs_eof)
682                 mask |= POLLHUP;
683         mutex_unlock(&crs->crs_lock);
684         return mask;
685 }
686
687 static long chlg_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
688 {
689         int rc;
690
691         struct chlg_reader_state *crs = file->private_data;
692         switch (cmd) {
693         case OBD_IOC_CHLG_POLL:
694                 crs->crs_poll = !!arg;
695                 rc = 0;
696                 break;
697         default:
698                 rc = -EINVAL;
699                 break;
700         }
701         return rc;
702 }
703
704 static const struct file_operations chlg_fops = {
705         .owner          = THIS_MODULE,
706         .llseek         = chlg_llseek,
707         .read           = chlg_read,
708         .write          = chlg_write,
709         .open           = chlg_open,
710         .release        = chlg_release,
711         .poll           = chlg_poll,
712         .unlocked_ioctl = chlg_ioctl,
713 };
714
715 /**
716  * This uses obd_name of the form: "testfs-MDT0000-mdc-ffff88006501600"
717  * and returns a name of the form: "changelog-testfs-MDT0000".
718  */
719 static void get_target_name(char *name, size_t name_len, struct obd_device *obd)
720 {
721         int i;
722
723         snprintf(name, name_len, "%s", obd->obd_name);
724
725         /* Find the 2nd '-' from the end and truncate on it */
726         for (i = 0; i < 2; i++) {
727                 char *p = strrchr(name, '-');
728
729                 if (p == NULL)
730                         return;
731                 *p = '\0';
732         }
733 }
734
735 /**
736  * Find a changelog character device by name.
737  * All devices registered during MDC setup are listed in a global list with
738  * their names attached.
739  */
740 static struct chlg_registered_dev *
741 chlg_registered_dev_find_by_name(const char *name)
742 {
743         struct chlg_registered_dev *dit;
744
745         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
746         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
747                 if (strcmp(name, dit->ced_name) == 0)
748                         return dit;
749         return NULL;
750 }
751
752 /**
753  * Find chlg_registered_dev structure for a given OBD device.
754  * This is bad O(n^2) but for each filesystem:
755  *   - N is # of MDTs times # of mount points
756  *   - this only runs at shutdown
757  */
758 static struct chlg_registered_dev *
759 chlg_registered_dev_find_by_obd(const struct obd_device *obd)
760 {
761         struct chlg_registered_dev *dit;
762         struct obd_device *oit;
763
764         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
765         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
766                 list_for_each_entry(oit, &dit->ced_obds,
767                                     u.cli.cl_chg_dev_linkage)
768                         if (oit == obd)
769                                 return dit;
770         return NULL;
771 }
772
773 /**
774  * Changelog character device initialization.
775  * Register a misc character device with a dynamic minor number, under a name
776  * of the form: 'changelog-fsname-MDTxxxx'. Reference this OBD device with it.
777  *
778  * @param[in] obd  This MDC obd_device.
779  * @return 0 on success, negated error code on failure.
780  */
781 int mdc_changelog_cdev_init(struct obd_device *obd)
782 {
783         struct chlg_registered_dev *exist;
784         struct chlg_registered_dev *entry;
785         int minor, rc;
786         ENTRY;
787
788         OBD_ALLOC_PTR(entry);
789         if (entry == NULL)
790                 RETURN(-ENOMEM);
791
792         get_target_name(entry->ced_name, sizeof(entry->ced_name), obd);
793
794         kref_init(&entry->ced_refs);
795         INIT_LIST_HEAD(&entry->ced_obds);
796         INIT_LIST_HEAD(&entry->ced_link);
797
798         mutex_lock(&chlg_registered_dev_lock);
799         exist = chlg_registered_dev_find_by_name(entry->ced_name);
800         if (exist != NULL) {
801                 kref_get(&exist->ced_refs);
802                 list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &exist->ced_obds);
803                 GOTO(out_unlock, rc = 0);
804         }
805
806         list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds);
807         list_add_tail(&entry->ced_link, &chlg_registered_devices);
808
809         rc = chlg_minor_alloc(&minor);
810         if (rc)
811                 GOTO(out_unlock, rc);
812
813         device_initialize(&entry->ced_device);
814         entry->ced_device.devt = MKDEV(MAJOR(mdc_changelog_dev), minor);
815         entry->ced_device.class = mdc_changelog_class;
816         entry->ced_device.release = chlg_device_release;
817         dev_set_drvdata(&entry->ced_device, entry);
818         rc = dev_set_name(&entry->ced_device, "%s-%s", MDC_CHANGELOG_DEV_NAME,
819                           entry->ced_name);
820         if (rc)
821                 GOTO(out_minor, rc);
822
823         /* Register new character device */
824         cdev_init(&entry->ced_cdev, &chlg_fops);
825         entry->ced_cdev.owner = THIS_MODULE;
826         rc = cdev_device_add(&entry->ced_cdev, &entry->ced_device);
827         if (rc)
828                 GOTO(out_device_name, rc);
829
830         entry = NULL;   /* prevent it from being freed below */
831         GOTO(out_unlock, rc = 0);
832
833 out_device_name:
834         kfree_const(entry->ced_device.kobj.name);
835
836 out_minor:
837         chlg_minor_free(minor);
838
839         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
840         list_del(&entry->ced_link);
841
842 out_unlock:
843         mutex_unlock(&chlg_registered_dev_lock);
844         if (entry)
845                 OBD_FREE_PTR(entry);
846         RETURN(rc);
847 }
848
849 /**
850  * Release OBD, decrease reference count of the corresponding changelog device.
851  */
852 void mdc_changelog_cdev_finish(struct obd_device *obd)
853 {
854         struct chlg_registered_dev *dev;
855
856         ENTRY;
857         mutex_lock(&chlg_registered_dev_lock);
858         dev = chlg_registered_dev_find_by_obd(obd);
859         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
860         kref_put(&dev->ced_refs, chlg_dev_clear);
861         mutex_unlock(&chlg_registered_dev_lock);
862         EXIT;
863 }