Whamcloud - gitweb
ee11f6bfdc69c8f36bd5c76aecb3c75f9417342b
[fs/lustre-release.git] / lustre / mdc / mdc_changelog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Commissariat a l'Energie Atomique et aux Energies
24  *                     Alternatives.
25  *
26  * Copyright (c) 2017, Intel Corporation.
27  *
28  * Author: Henri Doreau <henri.doreau@cea.fr>
29  */
30
31 #define DEBUG_SUBSYSTEM S_MDC
32
33 #include <linux/init.h>
34 #include <linux/kthread.h>
35 #include <linux/poll.h>
36 #include <linux/device.h>
37 #include <linux/cdev.h>
38
39 #include <lustre_log.h>
40 #include <uapi/linux/lustre/lustre_ioctl.h>
41
42 #include "mdc_internal.h"
43
44
45 /*
46  * -- Changelog delivery through character device --
47  */
48
49 /**
50  * Mutex to protect chlg_registered_devices below
51  */
52 static DEFINE_MUTEX(chlg_registered_dev_lock);
53
54 /**
55  * Global linked list of all registered devices (one per MDT).
56  */
57 static LIST_HEAD(chlg_registered_devices);
58
59
60 struct chlg_registered_dev {
61         /* Device name of the form "changelog-{MDTNAME}" */
62         char                     ced_name[32];
63         /* changelog char device */
64         struct cdev              ced_cdev;
65         struct device            ced_device;
66         /* OBDs referencing this device (multiple mount point) */
67         struct list_head         ced_obds;
68         /* Reference counter for proper deregistration */
69         struct kref              ced_refs;
70         /* Link within the global chlg_registered_devices */
71         struct list_head         ced_link;
72 };
73
74 struct chlg_reader_state {
75         /* Shortcut to the corresponding OBD device */
76         struct obd_device          *crs_obd;
77         /* the corresponding chlg_registered_dev */
78         struct chlg_registered_dev *crs_ced;
79         /* Producer thread (if any) */
80         struct task_struct         *crs_prod_task;
81         /* An error occurred that prevents from reading further */
82         int                         crs_err;
83         /* EOF, no more records available */
84         bool                        crs_eof;
85         /* Desired start position */
86         __u64                       crs_start_offset;
87         /* Wait queue for the catalog processing thread */
88         wait_queue_head_t           crs_waitq_prod;
89         /* Wait queue for the record copy threads */
90         wait_queue_head_t           crs_waitq_cons;
91         /* Mutex protecting crs_rec_count and crs_rec_queue */
92         struct mutex                crs_lock;
93         /* Number of item in the list */
94         __u64                       crs_rec_count;
95         /* List of prefetched enqueued_record::enq_linkage_items */
96         struct list_head            crs_rec_queue;
97         unsigned int                crs_last_catidx;
98         unsigned int                crs_last_idx;
99         bool                        crs_poll;
100 };
101
102 struct chlg_rec_entry {
103         /* Link within the chlg_reader_state::crs_rec_queue list */
104         struct list_head        enq_linkage;
105         /* Data (enq_record) field length */
106         __u64                   enq_length;
107         /* Copy of a changelog record (see struct llog_changelog_rec) */
108         struct changelog_rec    enq_record[];
109 };
110
111 enum {
112         /* Number of records to prefetch locally. */
113         CDEV_CHLG_MAX_PREFETCH = 1024,
114 };
115
116 DEFINE_IDR(mdc_changelog_minor_idr);
117 static DEFINE_SPINLOCK(chlg_minor_lock);
118
119 static int chlg_minor_alloc(int *pminor)
120 {
121         void *minor_allocated = (void *)-1;
122         int minor;
123
124         idr_preload(GFP_KERNEL);
125         spin_lock(&chlg_minor_lock);
126         minor = idr_alloc(&mdc_changelog_minor_idr, minor_allocated, 0,
127                           MDC_CHANGELOG_DEV_COUNT, GFP_NOWAIT);
128         spin_unlock(&chlg_minor_lock);
129         idr_preload_end();
130
131         if (minor < 0)
132                 return minor;
133
134         *pminor = minor;
135         return 0;
136 }
137
138 static void chlg_minor_free(int minor)
139 {
140         spin_lock(&chlg_minor_lock);
141         idr_remove(&mdc_changelog_minor_idr, minor);
142         spin_unlock(&chlg_minor_lock);
143 }
144
145 static void chlg_device_release(struct device *dev)
146 {
147         struct chlg_registered_dev *entry = dev_get_drvdata(dev);
148
149         chlg_minor_free(MINOR(entry->ced_cdev.dev));
150         OBD_FREE_PTR(entry);
151 }
152
153 /**
154  * Deregister a changelog character device whose refcount has reached zero.
155  */
156 static void chlg_dev_clear(struct kref *kref)
157 {
158         struct chlg_registered_dev *entry;
159
160         ENTRY;
161         entry = container_of(kref, struct chlg_registered_dev,
162                              ced_refs);
163
164         list_del(&entry->ced_link);
165         cdev_device_del(&entry->ced_cdev, &entry->ced_device);
166         put_device(&entry->ced_device);
167         EXIT;
168 }
169
170 static inline struct obd_device* chlg_obd_get(struct chlg_registered_dev *dev)
171 {
172         struct obd_device *obd;
173
174         mutex_lock(&chlg_registered_dev_lock);
175         if (list_empty(&dev->ced_obds))
176                 return NULL;
177
178         obd = list_first_entry(&dev->ced_obds, struct obd_device,
179                                u.cli.cl_chg_dev_linkage);
180         class_incref(obd, "changelog", dev);
181         mutex_unlock(&chlg_registered_dev_lock);
182         return obd;
183 }
184
185 static inline void chlg_obd_put(struct chlg_registered_dev *dev,
186                          struct obd_device *obd)
187 {
188         class_decref(obd, "changelog", dev);
189 }
190
191 /**
192  * ChangeLog catalog processing callback invoked on each record.
193  * If the current record is eligible to userland delivery, push
194  * it into the crs_rec_queue where the consumer code will fetch it.
195  *
196  * @param[in]     env  (unused)
197  * @param[in]     llh  Client-side handle used to identify the llog
198  * @param[in]     hdr  Header of the current llog record
199  * @param[in,out] data chlg_reader_state passed from caller
200  *
201  * @return 0 or LLOG_PROC_* control code on success, negated error on failure.
202  */
203 static int chlg_read_cat_process_cb(const struct lu_env *env,
204                                     struct llog_handle *llh,
205                                     struct llog_rec_hdr *hdr, void *data)
206 {
207         struct llog_changelog_rec *rec;
208         struct chlg_reader_state *crs = data;
209         struct chlg_rec_entry *enq;
210         size_t len;
211         int rc;
212         ENTRY;
213
214         LASSERT(crs != NULL);
215         LASSERT(hdr != NULL);
216
217         rec = container_of(hdr, struct llog_changelog_rec, cr_hdr);
218
219         crs->crs_last_catidx = llh->lgh_hdr->llh_cat_idx;
220         crs->crs_last_idx = hdr->lrh_index;
221
222         if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
223                 rc = -EINVAL;
224                 CERROR("%s: not a changelog rec %x/%d in llog "DFID" rc = %d\n",
225                        crs->crs_obd->obd_name, rec->cr_hdr.lrh_type,
226                        rec->cr.cr_type,
227                        PFID(lu_object_fid(&llh->lgh_obj->do_lu)), rc);
228                 RETURN(rc);
229         }
230
231         /* Skip undesired records */
232         if (rec->cr.cr_index < crs->crs_start_offset)
233                 RETURN(0);
234
235         CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n",
236                rec->cr.cr_index, rec->cr.cr_type,
237                changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
238                rec->cr.cr_flags & CLF_FLAGMASK,
239                PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
240                rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
241
242         wait_event_interruptible(crs->crs_waitq_prod,
243                                  crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH ||
244                                  kthread_should_stop());
245
246         if (kthread_should_stop())
247                 RETURN(LLOG_PROC_BREAK);
248
249         len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
250         OBD_ALLOC(enq, sizeof(*enq) + len);
251         if (enq == NULL)
252                 RETURN(-ENOMEM);
253
254         INIT_LIST_HEAD(&enq->enq_linkage);
255         enq->enq_length = len;
256         memcpy(enq->enq_record, &rec->cr, len);
257
258         mutex_lock(&crs->crs_lock);
259         list_add_tail(&enq->enq_linkage, &crs->crs_rec_queue);
260         crs->crs_rec_count++;
261         mutex_unlock(&crs->crs_lock);
262
263         wake_up_all(&crs->crs_waitq_cons);
264
265         RETURN(0);
266 }
267
268 /**
269  * Remove record from the list it is attached to and free it.
270  */
271 static void enq_record_delete(struct chlg_rec_entry *rec)
272 {
273         list_del(&rec->enq_linkage);
274         OBD_FREE(rec, sizeof(*rec) + rec->enq_length);
275 }
276
277 /**
278  * Record prefetch thread entry point. Opens the changelog catalog and starts
279  * reading records.
280  *
281  * @param[in,out]  args  chlg_reader_state passed from caller.
282  * @return 0 on success, negated error code on failure.
283  */
284 static int chlg_load(void *args)
285 {
286         struct chlg_reader_state *crs = args;
287         struct chlg_registered_dev *ced = crs->crs_ced;
288         struct obd_device *obd = NULL;
289         struct llog_ctxt *ctx = NULL;
290         struct llog_handle *llh = NULL;
291         int rc;
292         ENTRY;
293
294         crs->crs_last_catidx = 0;
295         crs->crs_last_idx = 0;
296
297 again:
298         obd = chlg_obd_get(ced);
299         if (obd == NULL)
300                 RETURN(-ENODEV);
301
302         crs->crs_obd = obd;
303
304         ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
305         if (ctx == NULL)
306                 GOTO(err_out, rc = -ENOENT);
307
308         rc = llog_open(NULL, ctx, &llh, NULL, CHANGELOG_CATALOG,
309                        LLOG_OPEN_EXISTS);
310         if (rc) {
311                 CERROR("%s: fail to open changelog catalog: rc = %d\n",
312                        obd->obd_name, rc);
313                 GOTO(err_out, rc);
314         }
315
316
317         rc = llog_init_handle(NULL, llh,
318                               LLOG_F_IS_CAT |
319                               LLOG_F_EXT_JOBID |
320                               LLOG_F_EXT_EXTRA_FLAGS |
321                               LLOG_F_EXT_X_UIDGID |
322                               LLOG_F_EXT_X_NID |
323                               LLOG_F_EXT_X_OMODE |
324                               LLOG_F_EXT_X_XATTR,
325                               NULL);
326         if (rc) {
327                 CERROR("%s: fail to init llog handle: rc = %d\n",
328                        obd->obd_name, rc);
329                 GOTO(err_out, rc);
330         }
331
332         rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs,
333                                 crs->crs_last_catidx, crs->crs_last_idx);
334         if (rc < 0) {
335                 CERROR("%s: fail to process llog: rc = %d\n", obd->obd_name, rc);
336                 GOTO(err_out, rc);
337         }
338         if (!kthread_should_stop() && crs->crs_poll) {
339                 llog_cat_close(NULL, llh);
340                 llog_ctxt_put(ctx);
341                 class_decref(obd, "changelog", crs);
342                 schedule_timeout_interruptible(cfs_time_seconds(1));
343                 goto again;
344         }
345
346         crs->crs_eof = true;
347
348 err_out:
349         if (rc < 0)
350                 crs->crs_err = rc;
351
352         wake_up_all(&crs->crs_waitq_cons);
353
354         if (llh != NULL)
355                 llog_cat_close(NULL, llh);
356
357         if (ctx != NULL)
358                 llog_ctxt_put(ctx);
359
360         crs->crs_obd = NULL;
361         chlg_obd_put(ced, obd);
362         wait_event_interruptible(crs->crs_waitq_prod, kthread_should_stop());
363
364         RETURN(rc);
365 }
366
367 /**
368  * Read handler, dequeues records from the chlg_reader_state if any.
369  * No partial records are copied to userland so this function can return less
370  * data than required (short read).
371  *
372  * @param[in]   file   File pointer to the character device.
373  * @param[out]  buff   Userland buffer where to copy the records.
374  * @param[in]   count  Userland buffer size.
375  * @param[out]  ppos   File position, updated with the index number of the next
376  *                     record to read.
377  * @return number of copied bytes on success, negated error code on failure.
378  */
379 static ssize_t chlg_read(struct file *file, char __user *buff, size_t count,
380                          loff_t *ppos)
381 {
382         struct chlg_reader_state *crs = file->private_data;
383         struct chlg_rec_entry *rec;
384         struct chlg_rec_entry *tmp;
385         size_t written_total = 0;
386         ssize_t rc;
387         LIST_HEAD(consumed);
388         ENTRY;
389
390         if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0) {
391                 if (crs->crs_err < 0)
392                         RETURN(crs->crs_err);
393                 else if (crs->crs_eof)
394                         RETURN(0);
395                 else
396                         RETURN(-EAGAIN);
397         }
398
399         rc = wait_event_interruptible(crs->crs_waitq_cons,
400                         crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err);
401
402         mutex_lock(&crs->crs_lock);
403         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
404                 if (written_total + rec->enq_length > count)
405                         break;
406
407                 if (copy_to_user(buff, rec->enq_record, rec->enq_length)) {
408                         rc = -EFAULT;
409                         break;
410                 }
411
412                 buff += rec->enq_length;
413                 written_total += rec->enq_length;
414
415                 crs->crs_rec_count--;
416                 list_move_tail(&rec->enq_linkage, &consumed);
417
418                 crs->crs_start_offset = rec->enq_record->cr_index + 1;
419         }
420         mutex_unlock(&crs->crs_lock);
421
422         if (written_total > 0) {
423                 rc = written_total;
424                 wake_up_all(&crs->crs_waitq_prod);
425         } else if (rc == 0) {
426                 rc = crs->crs_err;
427         }
428
429         list_for_each_entry_safe(rec, tmp, &consumed, enq_linkage)
430                 enq_record_delete(rec);
431
432         *ppos = crs->crs_start_offset;
433
434         RETURN(rc);
435 }
436
437 /**
438  * Jump to a given record index. Helper for chlg_llseek().
439  *
440  * @param[in,out]  crs     Internal reader state.
441  * @param[in]      offset  Desired offset (index record).
442  * @return 0 on success, negated error code on failure.
443  */
444 static int chlg_set_start_offset(struct chlg_reader_state *crs, __u64 offset)
445 {
446         struct chlg_rec_entry *rec;
447         struct chlg_rec_entry *tmp;
448
449         mutex_lock(&crs->crs_lock);
450         if (offset < crs->crs_start_offset) {
451                 mutex_unlock(&crs->crs_lock);
452                 return -ERANGE;
453         }
454
455         crs->crs_start_offset = offset;
456         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
457                 struct changelog_rec *cr = rec->enq_record;
458
459                 if (cr->cr_index >= crs->crs_start_offset)
460                         break;
461
462                 crs->crs_rec_count--;
463                 enq_record_delete(rec);
464         }
465
466         mutex_unlock(&crs->crs_lock);
467         wake_up_all(&crs->crs_waitq_prod);
468         return 0;
469 }
470
471 /**
472  * Move read pointer to a certain record index, encoded as an offset.
473  *
474  * @param[in,out] file   File pointer to the changelog character device
475  * @param[in]     off    Offset to skip, actually a record index, not byte count
476  * @param[in]     whence Relative/Absolute interpretation of the offset
477  * @return the resulting position on success or negated error code on failure.
478  */
479 static loff_t chlg_llseek(struct file *file, loff_t off, int whence)
480 {
481         struct chlg_reader_state *crs = file->private_data;
482         loff_t pos;
483         int rc;
484
485         switch (whence) {
486         case SEEK_SET:
487                 pos = off;
488                 break;
489         case SEEK_CUR:
490                 pos = file->f_pos + off;
491                 break;
492         case SEEK_END:
493         default:
494                 return -EINVAL;
495         }
496
497         /* We cannot go backward */
498         if (pos < file->f_pos)
499                 return -EINVAL;
500
501         rc = chlg_set_start_offset(crs, pos);
502         if (rc != 0)
503                 return rc;
504
505         file->f_pos = pos;
506         return pos;
507 }
508
509 /**
510  * Clear record range for a given changelog reader.
511  *
512  * @param[in]  crs     Current internal state.
513  * @param[in]  reader  Changelog reader ID (cl1, cl2...)
514  * @param[in]  record  Record index up which to clear
515  * @return 0 on success, negated error code on failure.
516  */
517 static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record)
518 {
519         struct obd_device *obd = NULL;
520         struct changelog_setinfo cs  = {
521                 .cs_recno = record,
522                 .cs_id    = reader
523         };
524         int rc;
525
526         obd = chlg_obd_get(crs->crs_ced);
527         if (obd == NULL)
528                 return -ENODEV;
529
530         rc = obd_set_info_async(NULL, obd->obd_self_export,
531                                 strlen(KEY_CHANGELOG_CLEAR),
532                                 KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL);
533
534         chlg_obd_put(crs->crs_ced, obd);
535         return rc;
536 }
537
538 /** Maximum changelog control command size */
539 #define CHLG_CONTROL_CMD_MAX    64
540
541 /**
542  * Handle writes() into the changelog character device. Write() can be used
543  * to request special control operations.
544  *
545  * @param[in]  file  File pointer to the changelog character device
546  * @param[in]  buff  User supplied data (written data)
547  * @param[in]  count Number of written bytes
548  * @param[in]  off   (unused)
549  * @return number of written bytes on success, negated error code on failure.
550  */
551 static ssize_t chlg_write(struct file *file, const char __user *buff,
552                           size_t count, loff_t *off)
553 {
554         struct chlg_reader_state *crs = file->private_data;
555         char *kbuf;
556         __u64 record;
557         __u32 reader;
558         int rc = 0;
559         ENTRY;
560
561         if (count > CHLG_CONTROL_CMD_MAX)
562                 RETURN(-EINVAL);
563
564         OBD_ALLOC(kbuf, CHLG_CONTROL_CMD_MAX);
565         if (kbuf == NULL)
566                 RETURN(-ENOMEM);
567
568         if (copy_from_user(kbuf, buff, count))
569                 GOTO(out_kbuf, rc = -EFAULT);
570
571         kbuf[CHLG_CONTROL_CMD_MAX - 1] = '\0';
572
573         if (sscanf(kbuf, "clear:cl%u:%llu", &reader, &record) == 2)
574                 rc = chlg_clear(crs, reader, record);
575         else
576                 rc = -EINVAL;
577
578         EXIT;
579 out_kbuf:
580         OBD_FREE(kbuf, CHLG_CONTROL_CMD_MAX);
581         return rc < 0 ? rc : count;
582 }
583
584 /**
585  * Open handler, initialize internal CRS state and spawn prefetch thread if
586  * needed.
587  * @param[in]  inode  Inode struct for the open character device.
588  * @param[in]  file   Corresponding file pointer.
589  * @return 0 on success, negated error code on failure.
590  */
591 static int chlg_open(struct inode *inode, struct file *file)
592 {
593         struct chlg_reader_state *crs;
594         struct chlg_registered_dev *dev;
595         struct task_struct *task;
596         int rc;
597         ENTRY;
598
599         dev = container_of(inode->i_cdev, struct chlg_registered_dev, ced_cdev);
600
601         OBD_ALLOC_PTR(crs);
602         if (!crs)
603                 RETURN(-ENOMEM);
604
605         kref_get(&dev->ced_refs);
606         crs->crs_ced = dev;
607         crs->crs_err = false;
608         crs->crs_eof = false;
609
610         mutex_init(&crs->crs_lock);
611         INIT_LIST_HEAD(&crs->crs_rec_queue);
612         init_waitqueue_head(&crs->crs_waitq_prod);
613         init_waitqueue_head(&crs->crs_waitq_cons);
614
615         if (file->f_mode & FMODE_READ) {
616                 task = kthread_run(chlg_load, crs, "chlg_load_thread");
617                 if (IS_ERR(task)) {
618                         rc = PTR_ERR(task);
619                         CERROR("%s: cannot start changelog thread: rc = %d\n",
620                                dev->ced_name, rc);
621                         GOTO(err_crs, rc);
622                 }
623                 crs->crs_prod_task = task;
624         }
625
626         file->private_data = crs;
627         RETURN(0);
628
629 err_crs:
630         kref_put(&dev->ced_refs, chlg_dev_clear);
631         OBD_FREE_PTR(crs);
632         return rc;
633 }
634
635 /**
636  * Close handler, release resources.
637  *
638  * @param[in]  inode  Inode struct for the open character device.
639  * @param[in]  file   Corresponding file pointer.
640  * @return 0 on success, negated error code on failure.
641  */
642 static int chlg_release(struct inode *inode, struct file *file)
643 {
644         struct chlg_reader_state *crs = file->private_data;
645         struct chlg_rec_entry *rec;
646         struct chlg_rec_entry *tmp;
647         int rc = 0;
648
649         if (crs->crs_prod_task)
650                 rc = kthread_stop(crs->crs_prod_task);
651
652         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage)
653                 enq_record_delete(rec);
654
655         kref_put(&crs->crs_ced->ced_refs, chlg_dev_clear);
656         OBD_FREE_PTR(crs);
657
658         return rc;
659 }
660
661 /**
662  * Poll handler, indicates whether the device is readable (new records) and
663  * writable (always).
664  *
665  * @param[in]  file   Device file pointer.
666  * @param[in]  wait   (opaque)
667  * @return combination of the poll status flags.
668  */
669 static unsigned int chlg_poll(struct file *file, poll_table *wait)
670 {
671         struct chlg_reader_state *crs = file->private_data;
672         unsigned int mask = 0;
673
674         mutex_lock(&crs->crs_lock);
675         poll_wait(file, &crs->crs_waitq_cons, wait);
676         if (crs->crs_rec_count > 0)
677                 mask |= POLLIN | POLLRDNORM;
678         if (crs->crs_err)
679                 mask |= POLLERR;
680         if (crs->crs_eof)
681                 mask |= POLLHUP;
682         mutex_unlock(&crs->crs_lock);
683         return mask;
684 }
685
686 static long chlg_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
687 {
688         int rc;
689
690         struct chlg_reader_state *crs = file->private_data;
691         switch (cmd) {
692         case OBD_IOC_CHLG_POLL:
693                 crs->crs_poll = !!arg;
694                 rc = 0;
695                 break;
696         default:
697                 rc = -EINVAL;
698                 break;
699         }
700         return rc;
701 }
702
703 static const struct file_operations chlg_fops = {
704         .owner          = THIS_MODULE,
705         .llseek         = chlg_llseek,
706         .read           = chlg_read,
707         .write          = chlg_write,
708         .open           = chlg_open,
709         .release        = chlg_release,
710         .poll           = chlg_poll,
711         .unlocked_ioctl = chlg_ioctl,
712 };
713
714 /**
715  * This uses obd_name of the form: "testfs-MDT0000-mdc-ffff88006501600"
716  * and returns a name of the form: "changelog-testfs-MDT0000".
717  */
718 static void get_target_name(char *name, size_t name_len, struct obd_device *obd)
719 {
720         int i;
721
722         snprintf(name, name_len, "%s", obd->obd_name);
723
724         /* Find the 2nd '-' from the end and truncate on it */
725         for (i = 0; i < 2; i++) {
726                 char *p = strrchr(name, '-');
727
728                 if (p == NULL)
729                         return;
730                 *p = '\0';
731         }
732 }
733
734 /**
735  * Find a changelog character device by name.
736  * All devices registered during MDC setup are listed in a global list with
737  * their names attached.
738  */
739 static struct chlg_registered_dev *
740 chlg_registered_dev_find_by_name(const char *name)
741 {
742         struct chlg_registered_dev *dit;
743
744         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
745         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
746                 if (strcmp(name, dit->ced_name) == 0)
747                         return dit;
748         return NULL;
749 }
750
751 /**
752  * Find chlg_registered_dev structure for a given OBD device.
753  * This is bad O(n^2) but for each filesystem:
754  *   - N is # of MDTs times # of mount points
755  *   - this only runs at shutdown
756  */
757 static struct chlg_registered_dev *
758 chlg_registered_dev_find_by_obd(const struct obd_device *obd)
759 {
760         struct chlg_registered_dev *dit;
761         struct obd_device *oit;
762
763         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
764         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
765                 list_for_each_entry(oit, &dit->ced_obds,
766                                     u.cli.cl_chg_dev_linkage)
767                         if (oit == obd)
768                                 return dit;
769         return NULL;
770 }
771
772 /**
773  * Changelog character device initialization.
774  * Register a misc character device with a dynamic minor number, under a name
775  * of the form: 'changelog-fsname-MDTxxxx'. Reference this OBD device with it.
776  *
777  * @param[in] obd  This MDC obd_device.
778  * @return 0 on success, negated error code on failure.
779  */
780 int mdc_changelog_cdev_init(struct obd_device *obd)
781 {
782         struct chlg_registered_dev *exist;
783         struct chlg_registered_dev *entry;
784         int minor, rc;
785         ENTRY;
786
787         OBD_ALLOC_PTR(entry);
788         if (entry == NULL)
789                 RETURN(-ENOMEM);
790
791         get_target_name(entry->ced_name, sizeof(entry->ced_name), obd);
792
793         kref_init(&entry->ced_refs);
794         INIT_LIST_HEAD(&entry->ced_obds);
795         INIT_LIST_HEAD(&entry->ced_link);
796
797         mutex_lock(&chlg_registered_dev_lock);
798         exist = chlg_registered_dev_find_by_name(entry->ced_name);
799         if (exist != NULL) {
800                 kref_get(&exist->ced_refs);
801                 list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &exist->ced_obds);
802                 GOTO(out_unlock, rc = 0);
803         }
804
805         list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds);
806         list_add_tail(&entry->ced_link, &chlg_registered_devices);
807
808         rc = chlg_minor_alloc(&minor);
809         if (rc)
810                 GOTO(out_unlock, rc);
811
812         device_initialize(&entry->ced_device);
813         entry->ced_device.devt = MKDEV(MAJOR(mdc_changelog_dev), minor);
814         entry->ced_device.class = mdc_changelog_class;
815         entry->ced_device.release = chlg_device_release;
816         dev_set_drvdata(&entry->ced_device, entry);
817         rc = dev_set_name(&entry->ced_device, "%s-%s", MDC_CHANGELOG_DEV_NAME,
818                           entry->ced_name);
819         if (rc)
820                 GOTO(out_minor, rc);
821
822         /* Register new character device */
823         cdev_init(&entry->ced_cdev, &chlg_fops);
824         entry->ced_cdev.owner = THIS_MODULE;
825         rc = cdev_device_add(&entry->ced_cdev, &entry->ced_device);
826         if (rc)
827                 GOTO(out_device_name, rc);
828
829         entry = NULL;   /* prevent it from being freed below */
830         GOTO(out_unlock, rc = 0);
831
832 out_device_name:
833         kfree_const(entry->ced_device.kobj.name);
834
835 out_minor:
836         chlg_minor_free(minor);
837
838         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
839         list_del(&entry->ced_link);
840
841 out_unlock:
842         mutex_unlock(&chlg_registered_dev_lock);
843         if (entry)
844                 OBD_FREE_PTR(entry);
845         RETURN(rc);
846 }
847
848 /**
849  * Release OBD, decrease reference count of the corresponding changelog device.
850  */
851 void mdc_changelog_cdev_finish(struct obd_device *obd)
852 {
853         struct chlg_registered_dev *dev;
854
855         ENTRY;
856         mutex_lock(&chlg_registered_dev_lock);
857         dev = chlg_registered_dev_find_by_obd(obd);
858         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
859         kref_put(&dev->ced_refs, chlg_dev_clear);
860         mutex_unlock(&chlg_registered_dev_lock);
861         EXIT;
862 }