Whamcloud - gitweb
LU-15850 mdt: pack default LMV in open reply
[fs/lustre-release.git] / lustre / mdc / mdc_changelog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Commissariat a l'Energie Atomique et aux Energies
24  *                     Alternatives.
25  *
26  * Copyright (c) 2017, Intel Corporation.
27  *
28  * Author: Henri Doreau <henri.doreau@cea.fr>
29  */
30
31 #define DEBUG_SUBSYSTEM S_MDC
32
33 #include <linux/init.h>
34 #include <linux/kthread.h>
35 #include <linux/poll.h>
36 #include <linux/device.h>
37 #include <linux/cdev.h>
38 #include <linux/idr.h>
39
40 #include <lustre_log.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42
43 #include "mdc_internal.h"
44
45
46 /*
47  * -- Changelog delivery through character device --
48  */
49
50 /**
51  * Mutex to protect chlg_registered_devices below
52  */
53 static DEFINE_MUTEX(chlg_registered_dev_lock);
54
55 /**
56  * Global linked list of all registered devices (one per MDT).
57  */
58 static LIST_HEAD(chlg_registered_devices);
59
60
61 struct chlg_registered_dev {
62         /* Device name of the form "changelog-{MDTNAME}" */
63         char                     ced_name[32];
64         /* changelog char device */
65         struct cdev              ced_cdev;
66         struct device            ced_device;
67         /* OBDs referencing this device (multiple mount point) */
68         struct list_head         ced_obds;
69         /* Reference counter for proper deregistration */
70         struct kref              ced_refs;
71         /* Link within the global chlg_registered_devices */
72         struct list_head         ced_link;
73 };
74
75 struct chlg_reader_state {
76         /* Shortcut to the corresponding OBD device */
77         struct obd_device          *crs_obd;
78         /* the corresponding chlg_registered_dev */
79         struct chlg_registered_dev *crs_ced;
80         /* Producer thread (if any) */
81         struct task_struct         *crs_prod_task;
82         /* An error occurred that prevents from reading further */
83         int                         crs_err;
84         /* EOF, no more records available */
85         bool                        crs_eof;
86         /* Desired start position */
87         __u64                       crs_start_offset;
88         /* Wait queue for the catalog processing thread */
89         wait_queue_head_t           crs_waitq_prod;
90         /* Wait queue for the record copy threads */
91         wait_queue_head_t           crs_waitq_cons;
92         /* Mutex protecting crs_rec_count and crs_rec_queue */
93         struct mutex                crs_lock;
94         /* Number of item in the list */
95         __u64                       crs_rec_count;
96         /* List of prefetched enqueued_record::enq_linkage_items */
97         struct list_head            crs_rec_queue;
98         unsigned int                crs_last_catidx;
99         unsigned int                crs_last_idx;
100         bool                        crs_poll;
101 };
102
103 struct chlg_rec_entry {
104         /* Link within the chlg_reader_state::crs_rec_queue list */
105         struct list_head        enq_linkage;
106         /* Data (enq_record) field length */
107         __u64                   enq_length;
108         /* Copy of a changelog record (see struct llog_changelog_rec) */
109         struct changelog_rec    enq_record[];
110 };
111
112 enum {
113         /* Number of records to prefetch locally. */
114         CDEV_CHLG_MAX_PREFETCH = 1024,
115 };
116
117 DEFINE_IDR(mdc_changelog_minor_idr);
118 static DEFINE_SPINLOCK(chlg_minor_lock);
119
120 static int chlg_minor_alloc(int *pminor)
121 {
122         void *minor_allocated = (void *)-1;
123         int minor;
124
125         idr_preload(GFP_KERNEL);
126         spin_lock(&chlg_minor_lock);
127         minor = idr_alloc(&mdc_changelog_minor_idr, minor_allocated, 0,
128                           MDC_CHANGELOG_DEV_COUNT, GFP_NOWAIT);
129         spin_unlock(&chlg_minor_lock);
130         idr_preload_end();
131
132         if (minor < 0)
133                 return minor;
134
135         *pminor = minor;
136         return 0;
137 }
138
139 static void chlg_minor_free(int minor)
140 {
141         spin_lock(&chlg_minor_lock);
142         idr_remove(&mdc_changelog_minor_idr, minor);
143         spin_unlock(&chlg_minor_lock);
144 }
145
146 static void chlg_device_release(struct device *dev)
147 {
148         struct chlg_registered_dev *entry = dev_get_drvdata(dev);
149
150         chlg_minor_free(MINOR(entry->ced_cdev.dev));
151         OBD_FREE_PTR(entry);
152 }
153
154 /**
155  * Deregister a changelog character device whose refcount has reached zero.
156  */
157 static void chlg_dev_clear(struct kref *kref)
158 {
159         struct chlg_registered_dev *entry;
160
161         ENTRY;
162         entry = container_of(kref, struct chlg_registered_dev,
163                              ced_refs);
164
165         list_del(&entry->ced_link);
166         cdev_device_del(&entry->ced_cdev, &entry->ced_device);
167         put_device(&entry->ced_device);
168         EXIT;
169 }
170
171 static inline struct obd_device* chlg_obd_get(struct chlg_registered_dev *dev)
172 {
173         struct obd_device *obd;
174
175         mutex_lock(&chlg_registered_dev_lock);
176         if (list_empty(&dev->ced_obds))
177                 return NULL;
178
179         obd = list_first_entry(&dev->ced_obds, struct obd_device,
180                                u.cli.cl_chg_dev_linkage);
181         class_incref(obd, "changelog", dev);
182         mutex_unlock(&chlg_registered_dev_lock);
183         return obd;
184 }
185
186 static inline void chlg_obd_put(struct chlg_registered_dev *dev,
187                          struct obd_device *obd)
188 {
189         class_decref(obd, "changelog", dev);
190 }
191
192 /**
193  * ChangeLog catalog processing callback invoked on each record.
194  * If the current record is eligible to userland delivery, push
195  * it into the crs_rec_queue where the consumer code will fetch it.
196  *
197  * @param[in]     env  (unused)
198  * @param[in]     llh  Client-side handle used to identify the llog
199  * @param[in]     hdr  Header of the current llog record
200  * @param[in,out] data chlg_reader_state passed from caller
201  *
202  * @return 0 or LLOG_PROC_* control code on success, negated error on failure.
203  */
204 static int chlg_read_cat_process_cb(const struct lu_env *env,
205                                     struct llog_handle *llh,
206                                     struct llog_rec_hdr *hdr, void *data)
207 {
208         struct llog_changelog_rec *rec;
209         struct chlg_reader_state *crs = data;
210         struct chlg_rec_entry *enq;
211         size_t len;
212         int rc;
213         ENTRY;
214
215         LASSERT(crs != NULL);
216         LASSERT(hdr != NULL);
217
218         rec = container_of(hdr, struct llog_changelog_rec, cr_hdr);
219
220         crs->crs_last_catidx = llh->lgh_hdr->llh_cat_idx;
221         crs->crs_last_idx = hdr->lrh_index;
222
223         if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
224                 rc = -EINVAL;
225                 CERROR("%s: not a changelog rec %x/%d in llog : rc = %d\n",
226                        crs->crs_obd->obd_name, rec->cr_hdr.lrh_type,
227                        rec->cr.cr_type, rc);
228                 RETURN(rc);
229         }
230
231         /* Check if we can skip the entire llog plain */
232         if (llog_is_plain_skipable(llh->lgh_hdr, hdr, rec->cr.cr_index,
233                                    crs->crs_start_offset))
234                 RETURN(LLOG_SKIP_PLAIN);
235
236         /* Skip undesired records */
237         if (rec->cr.cr_index < crs->crs_start_offset)
238                 RETURN(0);
239
240         CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n",
241                rec->cr.cr_index, rec->cr.cr_type,
242                changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
243                rec->cr.cr_flags & CLF_FLAGMASK,
244                PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
245                rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
246
247         wait_event_interruptible(crs->crs_waitq_prod,
248                                  crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH ||
249                                  kthread_should_stop());
250
251         if (kthread_should_stop())
252                 RETURN(LLOG_PROC_BREAK);
253
254         len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
255         OBD_ALLOC(enq, sizeof(*enq) + len);
256         if (enq == NULL)
257                 RETURN(-ENOMEM);
258
259         INIT_LIST_HEAD(&enq->enq_linkage);
260         enq->enq_length = len;
261         memcpy(enq->enq_record, &rec->cr, len);
262
263         mutex_lock(&crs->crs_lock);
264         list_add_tail(&enq->enq_linkage, &crs->crs_rec_queue);
265         crs->crs_rec_count++;
266         mutex_unlock(&crs->crs_lock);
267
268         wake_up(&crs->crs_waitq_cons);
269
270         RETURN(0);
271 }
272
273 /**
274  * Remove record from the list it is attached to and free it.
275  */
276 static void enq_record_delete(struct chlg_rec_entry *rec)
277 {
278         list_del(&rec->enq_linkage);
279         OBD_FREE(rec, sizeof(*rec) + rec->enq_length);
280 }
281
282 /**
283  * Record prefetch thread entry point. Opens the changelog catalog and starts
284  * reading records.
285  *
286  * @param[in,out]  args  chlg_reader_state passed from caller.
287  * @return 0 on success, negated error code on failure.
288  */
289 static int chlg_load(void *args)
290 {
291         struct chlg_reader_state *crs = args;
292         struct chlg_registered_dev *ced = crs->crs_ced;
293         struct obd_device *obd = NULL;
294         struct llog_ctxt *ctx = NULL;
295         struct llog_handle *llh = NULL;
296         int rc;
297         ENTRY;
298
299         crs->crs_last_catidx = 0;
300         crs->crs_last_idx = 0;
301
302 again:
303         obd = chlg_obd_get(ced);
304         if (obd == NULL)
305                 RETURN(-ENODEV);
306
307         crs->crs_obd = obd;
308
309         ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
310         if (ctx == NULL)
311                 GOTO(err_out, rc = -ENOENT);
312
313         rc = llog_open(NULL, ctx, &llh, NULL, CHANGELOG_CATALOG,
314                        LLOG_OPEN_EXISTS);
315         if (rc) {
316                 CERROR("%s: fail to open changelog catalog: rc = %d\n",
317                        obd->obd_name, rc);
318                 GOTO(err_out, rc);
319         }
320
321
322         rc = llog_init_handle(NULL, llh,
323                               LLOG_F_IS_CAT |
324                               LLOG_F_EXT_JOBID |
325                               LLOG_F_EXT_EXTRA_FLAGS |
326                               LLOG_F_EXT_X_UIDGID |
327                               LLOG_F_EXT_X_NID |
328                               LLOG_F_EXT_X_OMODE |
329                               LLOG_F_EXT_X_XATTR,
330                               NULL);
331         if (rc) {
332                 CERROR("%s: fail to init llog handle: rc = %d\n",
333                        obd->obd_name, rc);
334                 GOTO(err_out, rc);
335         }
336
337         rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs,
338                                 crs->crs_last_catidx, crs->crs_last_idx);
339         if (rc < 0) {
340                 CERROR("%s: fail to process llog: rc = %d\n", obd->obd_name, rc);
341                 GOTO(err_out, rc);
342         }
343         if (!kthread_should_stop() && crs->crs_poll) {
344                 llog_cat_close(NULL, llh);
345                 llog_ctxt_put(ctx);
346                 class_decref(obd, "changelog", crs);
347                 schedule_timeout_interruptible(cfs_time_seconds(1));
348                 goto again;
349         }
350
351         crs->crs_eof = true;
352
353 err_out:
354         if (rc < 0)
355                 crs->crs_err = rc;
356
357         wake_up(&crs->crs_waitq_cons);
358
359         if (llh != NULL)
360                 llog_cat_close(NULL, llh);
361
362         if (ctx != NULL)
363                 llog_ctxt_put(ctx);
364
365         crs->crs_obd = NULL;
366         chlg_obd_put(ced, obd);
367         wait_event_interruptible(crs->crs_waitq_prod, kthread_should_stop());
368
369         RETURN(rc);
370 }
371
372 static int chlg_start_thread(struct file *file)
373 {
374         struct chlg_reader_state *crs = file->private_data;
375         struct task_struct *task;
376         int rc = 0;
377
378         if (likely(crs->crs_prod_task))
379                 return 0;
380         if (unlikely(file->f_mode & FMODE_READ) == 0)
381                 return 0;
382
383         mutex_lock(&crs->crs_lock);
384         if (crs->crs_prod_task == NULL) {
385                 task = kthread_run(chlg_load, crs, "chlg_load_thread");
386                 if (IS_ERR(task)) {
387                         rc = PTR_ERR(task);
388                         CERROR("%s: cannot start changelog thread: rc = %d\n",
389                                crs->crs_ced->ced_name, rc);
390                         GOTO(out, rc);
391                 }
392                 crs->crs_prod_task = task;
393         }
394 out:
395         mutex_unlock(&crs->crs_lock);
396         return rc;
397 }
398
399 /**
400  * Read handler, dequeues records from the chlg_reader_state if any.
401  * No partial records are copied to userland so this function can return less
402  * data than required (short read).
403  *
404  * @param[in]   file   File pointer to the character device.
405  * @param[out]  buff   Userland buffer where to copy the records.
406  * @param[in]   count  Userland buffer size.
407  * @param[out]  ppos   File position, updated with the index number of the next
408  *                     record to read.
409  * @return number of copied bytes on success, negated error code on failure.
410  */
411 static ssize_t chlg_read(struct file *file, char __user *buff, size_t count,
412                          loff_t *ppos)
413 {
414         struct chlg_reader_state *crs = file->private_data;
415         struct chlg_rec_entry *rec;
416         struct chlg_rec_entry *tmp;
417         size_t written_total = 0;
418         ssize_t rc;
419         LIST_HEAD(consumed);
420         ENTRY;
421
422         if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0) {
423                 if (crs->crs_err < 0)
424                         RETURN(crs->crs_err);
425                 else if (crs->crs_eof)
426                         RETURN(0);
427                 else
428                         RETURN(-EAGAIN);
429         }
430
431         rc = chlg_start_thread(file);
432         if (rc)
433                 RETURN(rc);
434
435         rc = wait_event_interruptible(crs->crs_waitq_cons,
436                         crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err);
437
438         mutex_lock(&crs->crs_lock);
439         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
440                 if (written_total + rec->enq_length > count)
441                         break;
442
443                 if (copy_to_user(buff, rec->enq_record, rec->enq_length)) {
444                         rc = -EFAULT;
445                         break;
446                 }
447
448                 buff += rec->enq_length;
449                 written_total += rec->enq_length;
450
451                 crs->crs_rec_count--;
452                 list_move_tail(&rec->enq_linkage, &consumed);
453
454                 crs->crs_start_offset = rec->enq_record->cr_index + 1;
455         }
456         mutex_unlock(&crs->crs_lock);
457
458         if (written_total > 0) {
459                 rc = written_total;
460                 wake_up(&crs->crs_waitq_prod);
461         } else if (rc == 0) {
462                 rc = crs->crs_err;
463         }
464
465         list_for_each_entry_safe(rec, tmp, &consumed, enq_linkage)
466                 enq_record_delete(rec);
467
468         *ppos = crs->crs_start_offset;
469
470         RETURN(rc);
471 }
472
473 /**
474  * Jump to a given record index. Helper for chlg_llseek().
475  *
476  * @param[in,out]  crs     Internal reader state.
477  * @param[in]      offset  Desired offset (index record).
478  * @return 0 on success, negated error code on failure.
479  */
480 static int chlg_set_start_offset(struct chlg_reader_state *crs, __u64 offset)
481 {
482         struct chlg_rec_entry *rec;
483         struct chlg_rec_entry *tmp;
484
485         mutex_lock(&crs->crs_lock);
486         if (offset < crs->crs_start_offset) {
487                 mutex_unlock(&crs->crs_lock);
488                 return -ERANGE;
489         }
490
491         crs->crs_start_offset = offset;
492         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
493                 struct changelog_rec *cr = rec->enq_record;
494
495                 if (cr->cr_index >= crs->crs_start_offset)
496                         break;
497
498                 crs->crs_rec_count--;
499                 enq_record_delete(rec);
500         }
501
502         mutex_unlock(&crs->crs_lock);
503         wake_up(&crs->crs_waitq_prod);
504         return 0;
505 }
506
507 /**
508  * Move read pointer to a certain record index, encoded as an offset.
509  *
510  * @param[in,out] file   File pointer to the changelog character device
511  * @param[in]     off    Offset to skip, actually a record index, not byte count
512  * @param[in]     whence Relative/Absolute interpretation of the offset
513  * @return the resulting position on success or negated error code on failure.
514  */
515 static loff_t chlg_llseek(struct file *file, loff_t off, int whence)
516 {
517         struct chlg_reader_state *crs = file->private_data;
518         loff_t pos;
519         int rc;
520
521         switch (whence) {
522         case SEEK_SET:
523                 pos = off;
524                 break;
525         case SEEK_CUR:
526                 pos = file->f_pos + off;
527                 break;
528         case SEEK_END:
529         default:
530                 return -EINVAL;
531         }
532
533         /* We cannot go backward */
534         if (pos < file->f_pos)
535                 return -EINVAL;
536
537         rc = chlg_set_start_offset(crs, pos);
538         if (rc != 0)
539                 return rc;
540
541         file->f_pos = pos;
542         return pos;
543 }
544
545 /**
546  * Clear record range for a given changelog reader.
547  *
548  * @param[in]  crs     Current internal state.
549  * @param[in]  reader  Changelog reader ID (cl1, cl2...)
550  * @param[in]  record  Record index up which to clear
551  * @return 0 on success, negated error code on failure.
552  */
553 static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record)
554 {
555         struct obd_device *obd = NULL;
556         struct changelog_setinfo cs  = {
557                 .cs_recno = record,
558                 .cs_id    = reader
559         };
560         int rc;
561
562         obd = chlg_obd_get(crs->crs_ced);
563         if (obd == NULL)
564                 return -ENODEV;
565
566         rc = obd_set_info_async(NULL, obd->obd_self_export,
567                                 strlen(KEY_CHANGELOG_CLEAR),
568                                 KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL);
569
570         chlg_obd_put(crs->crs_ced, obd);
571         return rc;
572 }
573
574 /** Maximum changelog control command size */
575 #define CHLG_CONTROL_CMD_MAX    64
576
577 /**
578  * Handle writes() into the changelog character device. Write() can be used
579  * to request special control operations.
580  *
581  * @param[in]  file  File pointer to the changelog character device
582  * @param[in]  buff  User supplied data (written data)
583  * @param[in]  count Number of written bytes
584  * @param[in]  off   (unused)
585  * @return number of written bytes on success, negated error code on failure.
586  */
587 static ssize_t chlg_write(struct file *file, const char __user *buff,
588                           size_t count, loff_t *off)
589 {
590         struct chlg_reader_state *crs = file->private_data;
591         char *kbuf;
592         __u64 record;
593         __u32 reader;
594         int rc = 0;
595         ENTRY;
596
597         if (count > CHLG_CONTROL_CMD_MAX)
598                 RETURN(-EINVAL);
599
600         OBD_ALLOC(kbuf, CHLG_CONTROL_CMD_MAX);
601         if (kbuf == NULL)
602                 RETURN(-ENOMEM);
603
604         if (copy_from_user(kbuf, buff, count))
605                 GOTO(out_kbuf, rc = -EFAULT);
606
607         kbuf[CHLG_CONTROL_CMD_MAX - 1] = '\0';
608
609         if (sscanf(kbuf, "clear:cl%u:%llu", &reader, &record) == 2)
610                 rc = chlg_clear(crs, reader, record);
611         else
612                 rc = -EINVAL;
613
614         EXIT;
615 out_kbuf:
616         OBD_FREE(kbuf, CHLG_CONTROL_CMD_MAX);
617         return rc < 0 ? rc : count;
618 }
619
620 /**
621  * Open handler, initialize internal CRS state and spawn prefetch thread if
622  * needed.
623  * @param[in]  inode  Inode struct for the open character device.
624  * @param[in]  file   Corresponding file pointer.
625  * @return 0 on success, negated error code on failure.
626  */
627 static int chlg_open(struct inode *inode, struct file *file)
628 {
629         struct chlg_reader_state *crs;
630         struct chlg_registered_dev *dev;
631         ENTRY;
632
633         dev = container_of(inode->i_cdev, struct chlg_registered_dev, ced_cdev);
634
635         OBD_ALLOC_PTR(crs);
636         if (!crs)
637                 RETURN(-ENOMEM);
638
639         kref_get(&dev->ced_refs);
640         crs->crs_ced = dev;
641         crs->crs_err = false;
642         crs->crs_eof = false;
643
644         mutex_init(&crs->crs_lock);
645         INIT_LIST_HEAD(&crs->crs_rec_queue);
646         init_waitqueue_head(&crs->crs_waitq_prod);
647         init_waitqueue_head(&crs->crs_waitq_cons);
648         crs->crs_prod_task = NULL;
649
650         file->private_data = crs;
651         RETURN(0);
652 }
653
654 /**
655  * Close handler, release resources.
656  *
657  * @param[in]  inode  Inode struct for the open character device.
658  * @param[in]  file   Corresponding file pointer.
659  * @return 0 on success, negated error code on failure.
660  */
661 static int chlg_release(struct inode *inode, struct file *file)
662 {
663         struct chlg_reader_state *crs = file->private_data;
664         struct chlg_rec_entry *rec;
665         struct chlg_rec_entry *tmp;
666         int rc = 0;
667
668         if (crs->crs_prod_task)
669                 rc = kthread_stop(crs->crs_prod_task);
670
671         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage)
672                 enq_record_delete(rec);
673
674         kref_put(&crs->crs_ced->ced_refs, chlg_dev_clear);
675         OBD_FREE_PTR(crs);
676
677         return rc;
678 }
679
680 /**
681  * Poll handler, indicates whether the device is readable (new records) and
682  * writable (always).
683  *
684  * @param[in]  file   Device file pointer.
685  * @param[in]  wait   (opaque)
686  * @return combination of the poll status flags.
687  */
688 static unsigned int chlg_poll(struct file *file, poll_table *wait)
689 {
690         struct chlg_reader_state *crs = file->private_data;
691         unsigned int mask = 0;
692         int rc;
693
694         rc = chlg_start_thread(file);
695         if (rc)
696                 RETURN(rc);
697
698         mutex_lock(&crs->crs_lock);
699         poll_wait(file, &crs->crs_waitq_cons, wait);
700         if (crs->crs_rec_count > 0)
701                 mask |= POLLIN | POLLRDNORM;
702         if (crs->crs_err)
703                 mask |= POLLERR;
704         if (crs->crs_eof)
705                 mask |= POLLHUP;
706         mutex_unlock(&crs->crs_lock);
707         return mask;
708 }
709
710 static long chlg_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
711 {
712         int rc;
713
714         struct chlg_reader_state *crs = file->private_data;
715         switch (cmd) {
716         case OBD_IOC_CHLG_POLL:
717                 crs->crs_poll = !!arg;
718                 rc = 0;
719                 break;
720         default:
721                 rc = -EINVAL;
722                 break;
723         }
724         return rc;
725 }
726
727 static const struct file_operations chlg_fops = {
728         .owner          = THIS_MODULE,
729         .llseek         = chlg_llseek,
730         .read           = chlg_read,
731         .write          = chlg_write,
732         .open           = chlg_open,
733         .release        = chlg_release,
734         .poll           = chlg_poll,
735         .unlocked_ioctl = chlg_ioctl,
736 };
737
738 /**
739  * This uses obd_name of the form: "testfs-MDT0000-mdc-ffff88006501600"
740  * and returns a name of the form: "changelog-testfs-MDT0000".
741  */
742 static void get_target_name(char *name, size_t name_len, struct obd_device *obd)
743 {
744         int i;
745
746         snprintf(name, name_len, "%s", obd->obd_name);
747
748         /* Find the 2nd '-' from the end and truncate on it */
749         for (i = 0; i < 2; i++) {
750                 char *p = strrchr(name, '-');
751
752                 if (p == NULL)
753                         return;
754                 *p = '\0';
755         }
756 }
757
758 /**
759  * Find a changelog character device by name.
760  * All devices registered during MDC setup are listed in a global list with
761  * their names attached.
762  */
763 static struct chlg_registered_dev *
764 chlg_registered_dev_find_by_name(const char *name)
765 {
766         struct chlg_registered_dev *dit;
767
768         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
769         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
770                 if (strcmp(name, dit->ced_name) == 0)
771                         return dit;
772         return NULL;
773 }
774
775 /**
776  * Find chlg_registered_dev structure for a given OBD device.
777  * This is bad O(n^2) but for each filesystem:
778  *   - N is # of MDTs times # of mount points
779  *   - this only runs at shutdown
780  */
781 static struct chlg_registered_dev *
782 chlg_registered_dev_find_by_obd(const struct obd_device *obd)
783 {
784         struct chlg_registered_dev *dit;
785         struct obd_device *oit;
786
787         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
788         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
789                 list_for_each_entry(oit, &dit->ced_obds,
790                                     u.cli.cl_chg_dev_linkage)
791                         if (oit == obd)
792                                 return dit;
793         return NULL;
794 }
795
796 /**
797  * Changelog character device initialization.
798  * Register a misc character device with a dynamic minor number, under a name
799  * of the form: 'changelog-fsname-MDTxxxx'. Reference this OBD device with it.
800  *
801  * @param[in] obd  This MDC obd_device.
802  * @return 0 on success, negated error code on failure.
803  */
804 int mdc_changelog_cdev_init(struct obd_device *obd)
805 {
806         struct chlg_registered_dev *exist;
807         struct chlg_registered_dev *entry;
808         int minor, rc;
809         ENTRY;
810
811         OBD_ALLOC_PTR(entry);
812         if (entry == NULL)
813                 RETURN(-ENOMEM);
814
815         get_target_name(entry->ced_name, sizeof(entry->ced_name), obd);
816
817         kref_init(&entry->ced_refs);
818         INIT_LIST_HEAD(&entry->ced_obds);
819         INIT_LIST_HEAD(&entry->ced_link);
820
821         mutex_lock(&chlg_registered_dev_lock);
822         exist = chlg_registered_dev_find_by_name(entry->ced_name);
823         if (exist != NULL) {
824                 kref_get(&exist->ced_refs);
825                 list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &exist->ced_obds);
826                 GOTO(out_unlock, rc = 0);
827         }
828
829         list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds);
830         list_add_tail(&entry->ced_link, &chlg_registered_devices);
831
832         rc = chlg_minor_alloc(&minor);
833         if (rc)
834                 GOTO(out_listrm, rc);
835
836         device_initialize(&entry->ced_device);
837         entry->ced_device.devt = MKDEV(MAJOR(mdc_changelog_dev), minor);
838         entry->ced_device.class = mdc_changelog_class;
839         entry->ced_device.release = chlg_device_release;
840         dev_set_drvdata(&entry->ced_device, entry);
841         rc = dev_set_name(&entry->ced_device, "%s-%s", MDC_CHANGELOG_DEV_NAME,
842                           entry->ced_name);
843         if (rc)
844                 GOTO(out_minor, rc);
845
846         /* Register new character device */
847         cdev_init(&entry->ced_cdev, &chlg_fops);
848         entry->ced_cdev.owner = THIS_MODULE;
849         rc = cdev_device_add(&entry->ced_cdev, &entry->ced_device);
850         if (rc)
851                 GOTO(out_device_name, rc);
852
853         entry = NULL;   /* prevent it from being freed below */
854         GOTO(out_unlock, rc = 0);
855
856 out_device_name:
857         kfree_const(entry->ced_device.kobj.name);
858
859 out_minor:
860         chlg_minor_free(minor);
861
862 out_listrm:
863         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
864         list_del(&entry->ced_link);
865
866 out_unlock:
867         mutex_unlock(&chlg_registered_dev_lock);
868         if (entry)
869                 OBD_FREE_PTR(entry);
870         RETURN(rc);
871 }
872
873 /**
874  * Release OBD, decrease reference count of the corresponding changelog device.
875  */
876 void mdc_changelog_cdev_finish(struct obd_device *obd)
877 {
878         struct chlg_registered_dev *dev;
879
880         ENTRY;
881         mutex_lock(&chlg_registered_dev_lock);
882         dev = chlg_registered_dev_find_by_obd(obd);
883         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
884         kref_put(&dev->ced_refs, chlg_dev_clear);
885         mutex_unlock(&chlg_registered_dev_lock);
886         EXIT;
887 }