Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / mdc / mdc_changelog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Commissariat a l'Energie Atomique et aux Energies
24  *                     Alternatives.
25  *
26  * Copyright (c) 2017, Intel Corporation.
27  *
28  * Author: Henri Doreau <henri.doreau@cea.fr>
29  */
30
31 #define DEBUG_SUBSYSTEM S_MDC
32
33 #include <linux/init.h>
34 #include <linux/kthread.h>
35 #include <linux/poll.h>
36 #include <linux/device.h>
37 #include <linux/cdev.h>
38 #include <linux/idr.h>
39
40 #include <lustre_log.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42
43 #include "mdc_internal.h"
44
45
46 /*
47  * -- Changelog delivery through character device --
48  */
49
50 /**
51  * Mutex to protect chlg_registered_devices below
52  */
53 static DEFINE_MUTEX(chlg_registered_dev_lock);
54
55 /**
56  * Global linked list of all registered devices (one per MDT).
57  */
58 static LIST_HEAD(chlg_registered_devices);
59
60
61 struct chlg_registered_dev {
62         /* Device name of the form "changelog-{MDTNAME}" */
63         char                     ced_name[32];
64         /* changelog char device */
65         struct cdev              ced_cdev;
66         struct device            ced_device;
67         /* OBDs referencing this device (multiple mount point) */
68         struct list_head         ced_obds;
69         /* Reference counter for proper deregistration */
70         struct kref              ced_refs;
71         /* Link within the global chlg_registered_devices */
72         struct list_head         ced_link;
73 };
74
75 struct chlg_reader_state {
76         /* Shortcut to the corresponding OBD device */
77         struct obd_device          *crs_obd;
78         /* the corresponding chlg_registered_dev */
79         struct chlg_registered_dev *crs_ced;
80         /* Producer thread (if any) */
81         struct task_struct         *crs_prod_task;
82         /* An error occurred that prevents from reading further */
83         int                         crs_err;
84         /* EOF, no more records available */
85         bool                        crs_eof;
86         /* Desired start position */
87         __u64                       crs_start_offset;
88         /* Wait queue for the catalog processing thread */
89         wait_queue_head_t           crs_waitq_prod;
90         /* Wait queue for the record copy threads */
91         wait_queue_head_t           crs_waitq_cons;
92         /* Mutex protecting crs_rec_count and crs_rec_queue */
93         struct mutex                crs_lock;
94         /* Number of item in the list */
95         __u64                       crs_rec_count;
96         /* List of prefetched enqueued_record::enq_linkage_items */
97         struct list_head            crs_rec_queue;
98         unsigned int                crs_last_catidx;
99         unsigned int                crs_last_idx;
100         unsigned int                crs_flags;
101 };
102
103 struct chlg_rec_entry {
104         /* Link within the chlg_reader_state::crs_rec_queue list */
105         struct list_head        enq_linkage;
106         /* Data (enq_record) field length */
107         __u64                   enq_length;
108         /* Copy of a changelog record (see struct llog_changelog_rec) */
109         struct changelog_rec    enq_record[];
110 };
111
112 enum {
113         /* Number of records to prefetch locally. */
114         CDEV_CHLG_MAX_PREFETCH = 1024,
115 };
116
117 DEFINE_IDR(mdc_changelog_minor_idr);
118 static DEFINE_SPINLOCK(chlg_minor_lock);
119
120 static int chlg_minor_alloc(int *pminor)
121 {
122         void *minor_allocated = (void *)-1;
123         int minor;
124
125         idr_preload(GFP_KERNEL);
126         spin_lock(&chlg_minor_lock);
127         minor = idr_alloc(&mdc_changelog_minor_idr, minor_allocated, 0,
128                           MDC_CHANGELOG_DEV_COUNT, GFP_NOWAIT);
129         spin_unlock(&chlg_minor_lock);
130         idr_preload_end();
131
132         if (minor < 0)
133                 return minor;
134
135         *pminor = minor;
136         return 0;
137 }
138
139 static void chlg_minor_free(int minor)
140 {
141         spin_lock(&chlg_minor_lock);
142         idr_remove(&mdc_changelog_minor_idr, minor);
143         spin_unlock(&chlg_minor_lock);
144 }
145
146 static void chlg_device_release(struct device *dev)
147 {
148         struct chlg_registered_dev *entry = dev_get_drvdata(dev);
149
150         chlg_minor_free(MINOR(entry->ced_cdev.dev));
151         OBD_FREE_PTR(entry);
152 }
153
154 /**
155  * Deregister a changelog character device whose refcount has reached zero.
156  */
157 static void chlg_dev_clear(struct kref *kref)
158 {
159         struct chlg_registered_dev *entry;
160
161         ENTRY;
162         entry = container_of(kref, struct chlg_registered_dev,
163                              ced_refs);
164
165         list_del(&entry->ced_link);
166         cdev_device_del(&entry->ced_cdev, &entry->ced_device);
167         put_device(&entry->ced_device);
168         EXIT;
169 }
170
171 static inline struct obd_device* chlg_obd_get(struct chlg_registered_dev *dev)
172 {
173         struct obd_device *obd;
174
175         mutex_lock(&chlg_registered_dev_lock);
176         if (list_empty(&dev->ced_obds))
177                 return NULL;
178
179         obd = list_first_entry(&dev->ced_obds, struct obd_device,
180                                u.cli.cl_chg_dev_linkage);
181         class_incref(obd, "changelog", dev);
182         mutex_unlock(&chlg_registered_dev_lock);
183         return obd;
184 }
185
186 static inline void chlg_obd_put(struct chlg_registered_dev *dev,
187                          struct obd_device *obd)
188 {
189         class_decref(obd, "changelog", dev);
190 }
191
192 /**
193  * ChangeLog catalog processing callback invoked on each record.
194  * If the current record is eligible to userland delivery, push
195  * it into the crs_rec_queue where the consumer code will fetch it.
196  *
197  * @param[in]     env  (unused)
198  * @param[in]     llh  Client-side handle used to identify the llog
199  * @param[in]     hdr  Header of the current llog record
200  * @param[in,out] data chlg_reader_state passed from caller
201  *
202  * @return 0 or LLOG_PROC_* control code on success, negated error on failure.
203  */
204 static int chlg_read_cat_process_cb(const struct lu_env *env,
205                                     struct llog_handle *llh,
206                                     struct llog_rec_hdr *hdr, void *data)
207 {
208         struct llog_changelog_rec *rec;
209         struct chlg_reader_state *crs = data;
210         struct chlg_rec_entry *enq;
211         size_t len;
212         int rc;
213         ENTRY;
214
215         LASSERT(crs != NULL);
216         LASSERT(hdr != NULL);
217
218         rec = container_of(hdr, struct llog_changelog_rec, cr_hdr);
219
220         crs->crs_last_catidx = llh->lgh_hdr->llh_cat_idx;
221         crs->crs_last_idx = hdr->lrh_index;
222
223         if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
224                 rc = -EINVAL;
225                 CERROR("%s: not a changelog rec %x/%d in llog : rc = %d\n",
226                        crs->crs_obd->obd_name, rec->cr_hdr.lrh_type,
227                        rec->cr.cr_type, rc);
228                 RETURN(rc);
229         }
230
231         /* Check if we can skip the entire llog plain */
232         if (llog_is_plain_skipable(llh->lgh_hdr, hdr, rec->cr.cr_index,
233                                    crs->crs_start_offset))
234                 RETURN(LLOG_SKIP_PLAIN);
235
236         /* Skip undesired records */
237         if (rec->cr.cr_index < crs->crs_start_offset)
238                 RETURN(0);
239
240         CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n",
241                rec->cr.cr_index, rec->cr.cr_type,
242                changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
243                rec->cr.cr_flags & CLF_FLAGMASK,
244                PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
245                rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
246
247         wait_event_interruptible(crs->crs_waitq_prod,
248                                  crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH ||
249                                  kthread_should_stop());
250
251         if (kthread_should_stop())
252                 RETURN(LLOG_PROC_BREAK);
253
254         len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
255         OBD_ALLOC(enq, sizeof(*enq) + len);
256         if (enq == NULL)
257                 RETURN(-ENOMEM);
258
259         INIT_LIST_HEAD(&enq->enq_linkage);
260         enq->enq_length = len;
261         memcpy(enq->enq_record, &rec->cr, len);
262
263         mutex_lock(&crs->crs_lock);
264         list_add_tail(&enq->enq_linkage, &crs->crs_rec_queue);
265         crs->crs_rec_count++;
266         mutex_unlock(&crs->crs_lock);
267
268         wake_up(&crs->crs_waitq_cons);
269
270         RETURN(0);
271 }
272
273 /**
274  * Remove record from the list it is attached to and free it.
275  */
276 static void enq_record_delete(struct chlg_rec_entry *rec)
277 {
278         list_del(&rec->enq_linkage);
279         OBD_FREE(rec, sizeof(*rec) + rec->enq_length);
280 }
281
282 /**
283  * Record prefetch thread entry point. Opens the changelog catalog and starts
284  * reading records.
285  *
286  * @param[in,out]  args  chlg_reader_state passed from caller.
287  * @return 0 on success, negated error code on failure.
288  */
289 static int chlg_load(void *args)
290 {
291         struct chlg_reader_state *crs = args;
292         struct chlg_registered_dev *ced = crs->crs_ced;
293         struct obd_device *obd = NULL;
294         struct llog_ctxt *ctx = NULL;
295         struct llog_handle *llh = NULL;
296         int rc;
297         ENTRY;
298
299         crs->crs_last_catidx = 0;
300         crs->crs_last_idx = 0;
301
302 again:
303         obd = chlg_obd_get(ced);
304         if (obd == NULL)
305                 RETURN(-ENODEV);
306
307         crs->crs_obd = obd;
308
309         ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
310         if (ctx == NULL)
311                 GOTO(err_out, rc = -ENOENT);
312
313         rc = llog_open(NULL, ctx, &llh, NULL, CHANGELOG_CATALOG,
314                        LLOG_OPEN_EXISTS);
315         if (rc) {
316                 CERROR("%s: fail to open changelog catalog: rc = %d\n",
317                        obd->obd_name, rc);
318                 GOTO(err_out, rc);
319         }
320
321
322         rc = llog_init_handle(NULL, llh,
323                               LLOG_F_IS_CAT |
324                               LLOG_F_EXT_JOBID |
325                               LLOG_F_EXT_EXTRA_FLAGS |
326                               LLOG_F_EXT_X_UIDGID |
327                               LLOG_F_EXT_X_NID |
328                               LLOG_F_EXT_X_OMODE |
329                               LLOG_F_EXT_X_XATTR,
330                               NULL);
331         if (rc) {
332                 CERROR("%s: fail to init llog handle: rc = %d\n",
333                        obd->obd_name, rc);
334                 GOTO(err_out, rc);
335         }
336
337         rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs,
338                                 crs->crs_last_catidx, crs->crs_last_idx);
339         if (rc < 0) {
340                 CERROR("%s: fail to process llog: rc = %d\n", obd->obd_name, rc);
341                 GOTO(err_out, rc);
342         }
343         if (!kthread_should_stop() &&
344             (crs->crs_flags & CHANGELOG_FLAG_FOLLOW)) {
345                 llog_cat_close(NULL, llh);
346                 llog_ctxt_put(ctx);
347                 class_decref(obd, "changelog", crs);
348                 schedule_timeout_interruptible(cfs_time_seconds(1));
349                 goto again;
350         }
351
352         crs->crs_eof = true;
353
354 err_out:
355         if (rc < 0)
356                 crs->crs_err = rc;
357
358         wake_up(&crs->crs_waitq_cons);
359
360         if (llh != NULL)
361                 llog_cat_close(NULL, llh);
362
363         if (ctx != NULL)
364                 llog_ctxt_put(ctx);
365
366         crs->crs_obd = NULL;
367         chlg_obd_put(ced, obd);
368         wait_event_interruptible(crs->crs_waitq_prod, kthread_should_stop());
369
370         RETURN(rc);
371 }
372
373 static int chlg_start_thread(struct file *file)
374 {
375         struct chlg_reader_state *crs = file->private_data;
376         struct task_struct *task;
377         int rc = 0;
378
379         if (likely(crs->crs_prod_task))
380                 return 0;
381         if (unlikely(file->f_mode & FMODE_READ) == 0)
382                 return 0;
383
384         mutex_lock(&crs->crs_lock);
385         if (crs->crs_prod_task == NULL) {
386                 task = kthread_run(chlg_load, crs, "chlg_load_thread");
387                 if (IS_ERR(task)) {
388                         rc = PTR_ERR(task);
389                         CERROR("%s: cannot start changelog thread: rc = %d\n",
390                                crs->crs_ced->ced_name, rc);
391                         GOTO(out, rc);
392                 }
393                 crs->crs_prod_task = task;
394         }
395 out:
396         mutex_unlock(&crs->crs_lock);
397         return rc;
398 }
399
400 /**
401  * Read handler, dequeues records from the chlg_reader_state if any.
402  * No partial records are copied to userland so this function can return less
403  * data than required (short read).
404  *
405  * @param[in]   file   File pointer to the character device.
406  * @param[out]  buff   Userland buffer where to copy the records.
407  * @param[in]   count  Userland buffer size.
408  * @param[out]  ppos   File position, updated with the index number of the next
409  *                     record to read.
410  * @return number of copied bytes on success, negated error code on failure.
411  */
412 static ssize_t chlg_read(struct file *file, char __user *buff, size_t count,
413                          loff_t *ppos)
414 {
415         struct chlg_reader_state *crs = file->private_data;
416         struct chlg_rec_entry *rec;
417         struct chlg_rec_entry *tmp;
418         size_t written_total = 0;
419         ssize_t rc;
420         LIST_HEAD(consumed);
421         ENTRY;
422
423         if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0) {
424                 if (crs->crs_err < 0)
425                         RETURN(crs->crs_err);
426                 else if (crs->crs_eof)
427                         RETURN(0);
428                 else
429                         RETURN(-EAGAIN);
430         }
431
432         rc = chlg_start_thread(file);
433         if (rc)
434                 RETURN(rc);
435
436         rc = wait_event_interruptible(crs->crs_waitq_cons,
437                         crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err);
438
439         mutex_lock(&crs->crs_lock);
440         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
441                 if (written_total + rec->enq_length > count)
442                         break;
443
444                 if (copy_to_user(buff, rec->enq_record, rec->enq_length)) {
445                         rc = -EFAULT;
446                         break;
447                 }
448
449                 buff += rec->enq_length;
450                 written_total += rec->enq_length;
451
452                 crs->crs_rec_count--;
453                 list_move_tail(&rec->enq_linkage, &consumed);
454
455                 crs->crs_start_offset = rec->enq_record->cr_index + 1;
456         }
457         mutex_unlock(&crs->crs_lock);
458
459         if (written_total > 0) {
460                 rc = written_total;
461                 wake_up(&crs->crs_waitq_prod);
462         } else if (rc == 0) {
463                 rc = crs->crs_err;
464         }
465
466         list_for_each_entry_safe(rec, tmp, &consumed, enq_linkage)
467                 enq_record_delete(rec);
468
469         *ppos = crs->crs_start_offset;
470
471         RETURN(rc);
472 }
473
474 /**
475  * Jump to a given record index. Helper for chlg_llseek().
476  *
477  * @param[in,out]  crs     Internal reader state.
478  * @param[in]      offset  Desired offset (index record).
479  * @return 0 on success, negated error code on failure.
480  */
481 static int chlg_set_start_offset(struct chlg_reader_state *crs, __u64 offset)
482 {
483         struct chlg_rec_entry *rec;
484         struct chlg_rec_entry *tmp;
485
486         mutex_lock(&crs->crs_lock);
487         if (offset < crs->crs_start_offset) {
488                 mutex_unlock(&crs->crs_lock);
489                 return -ERANGE;
490         }
491
492         crs->crs_start_offset = offset;
493         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
494                 struct changelog_rec *cr = rec->enq_record;
495
496                 if (cr->cr_index >= crs->crs_start_offset)
497                         break;
498
499                 crs->crs_rec_count--;
500                 enq_record_delete(rec);
501         }
502
503         mutex_unlock(&crs->crs_lock);
504         wake_up(&crs->crs_waitq_prod);
505         return 0;
506 }
507
508 /**
509  * Move read pointer to a certain record index, encoded as an offset.
510  *
511  * @param[in,out] file   File pointer to the changelog character device
512  * @param[in]     off    Offset to skip, actually a record index, not byte count
513  * @param[in]     whence Relative/Absolute interpretation of the offset
514  * @return the resulting position on success or negated error code on failure.
515  */
516 static loff_t chlg_llseek(struct file *file, loff_t off, int whence)
517 {
518         struct chlg_reader_state *crs = file->private_data;
519         loff_t pos;
520         int rc;
521
522         switch (whence) {
523         case SEEK_SET:
524                 pos = off;
525                 break;
526         case SEEK_CUR:
527                 pos = file->f_pos + off;
528                 break;
529         case SEEK_END:
530         default:
531                 return -EINVAL;
532         }
533
534         /* We cannot go backward */
535         if (pos < file->f_pos)
536                 return -EINVAL;
537
538         rc = chlg_set_start_offset(crs, pos);
539         if (rc != 0)
540                 return rc;
541
542         file->f_pos = pos;
543         return pos;
544 }
545
546 /**
547  * Clear record range for a given changelog reader.
548  *
549  * @param[in]  crs     Current internal state.
550  * @param[in]  reader  Changelog reader ID (cl1, cl2...)
551  * @param[in]  record  Record index up which to clear
552  * @return 0 on success, negated error code on failure.
553  */
554 static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record)
555 {
556         struct obd_device *obd = NULL;
557         struct changelog_setinfo cs  = {
558                 .cs_recno = record,
559                 .cs_id    = reader
560         };
561         int rc;
562
563         obd = chlg_obd_get(crs->crs_ced);
564         if (obd == NULL)
565                 return -ENODEV;
566
567         rc = obd_set_info_async(NULL, obd->obd_self_export,
568                                 strlen(KEY_CHANGELOG_CLEAR),
569                                 KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL);
570
571         chlg_obd_put(crs->crs_ced, obd);
572         return rc;
573 }
574
575 /** Maximum changelog control command size */
576 #define CHLG_CONTROL_CMD_MAX    64
577
578 /**
579  * Handle writes() into the changelog character device. Write() can be used
580  * to request special control operations.
581  *
582  * @param[in]  file  File pointer to the changelog character device
583  * @param[in]  buff  User supplied data (written data)
584  * @param[in]  count Number of written bytes
585  * @param[in]  off   (unused)
586  * @return number of written bytes on success, negated error code on failure.
587  */
588 static ssize_t chlg_write(struct file *file, const char __user *buff,
589                           size_t count, loff_t *off)
590 {
591         struct chlg_reader_state *crs = file->private_data;
592         char *kbuf;
593         __u64 record;
594         __u32 reader;
595         int rc = 0;
596         ENTRY;
597
598         if (count > CHLG_CONTROL_CMD_MAX)
599                 RETURN(-EINVAL);
600
601         OBD_ALLOC(kbuf, CHLG_CONTROL_CMD_MAX);
602         if (kbuf == NULL)
603                 RETURN(-ENOMEM);
604
605         if (copy_from_user(kbuf, buff, count))
606                 GOTO(out_kbuf, rc = -EFAULT);
607
608         kbuf[CHLG_CONTROL_CMD_MAX - 1] = '\0';
609
610         if (sscanf(kbuf, "clear:cl%u:%llu", &reader, &record) == 2)
611                 rc = chlg_clear(crs, reader, record);
612         else
613                 rc = -EINVAL;
614
615         EXIT;
616 out_kbuf:
617         OBD_FREE(kbuf, CHLG_CONTROL_CMD_MAX);
618         return rc < 0 ? rc : count;
619 }
620
621 /**
622  * Open handler, initialize internal CRS state and spawn prefetch thread if
623  * needed.
624  * @param[in]  inode  Inode struct for the open character device.
625  * @param[in]  file   Corresponding file pointer.
626  * @return 0 on success, negated error code on failure.
627  */
628 static int chlg_open(struct inode *inode, struct file *file)
629 {
630         struct chlg_reader_state *crs;
631         struct chlg_registered_dev *dev;
632         ENTRY;
633
634         dev = container_of(inode->i_cdev, struct chlg_registered_dev, ced_cdev);
635
636         OBD_ALLOC_PTR(crs);
637         if (!crs)
638                 RETURN(-ENOMEM);
639
640         kref_get(&dev->ced_refs);
641         crs->crs_ced = dev;
642         crs->crs_err = false;
643         crs->crs_eof = false;
644
645         mutex_init(&crs->crs_lock);
646         INIT_LIST_HEAD(&crs->crs_rec_queue);
647         init_waitqueue_head(&crs->crs_waitq_prod);
648         init_waitqueue_head(&crs->crs_waitq_cons);
649         crs->crs_prod_task = NULL;
650
651         file->private_data = crs;
652         RETURN(0);
653 }
654
655 /**
656  * Close handler, release resources.
657  *
658  * @param[in]  inode  Inode struct for the open character device.
659  * @param[in]  file   Corresponding file pointer.
660  * @return 0 on success, negated error code on failure.
661  */
662 static int chlg_release(struct inode *inode, struct file *file)
663 {
664         struct chlg_reader_state *crs = file->private_data;
665         struct chlg_rec_entry *rec;
666         struct chlg_rec_entry *tmp;
667         int rc = 0;
668
669         if (crs->crs_prod_task)
670                 rc = kthread_stop(crs->crs_prod_task);
671
672         list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage)
673                 enq_record_delete(rec);
674
675         kref_put(&crs->crs_ced->ced_refs, chlg_dev_clear);
676         OBD_FREE_PTR(crs);
677
678         return rc;
679 }
680
681 /**
682  * Poll handler, indicates whether the device is readable (new records) and
683  * writable (always).
684  *
685  * @param[in]  file   Device file pointer.
686  * @param[in]  wait   (opaque)
687  * @return combination of the poll status flags.
688  */
689 static unsigned int chlg_poll(struct file *file, poll_table *wait)
690 {
691         struct chlg_reader_state *crs = file->private_data;
692         unsigned int mask = 0;
693         int rc;
694
695         rc = chlg_start_thread(file);
696         if (rc)
697                 RETURN(rc);
698
699         mutex_lock(&crs->crs_lock);
700         poll_wait(file, &crs->crs_waitq_cons, wait);
701         if (crs->crs_rec_count > 0)
702                 mask |= POLLIN | POLLRDNORM;
703         if (crs->crs_err)
704                 mask |= POLLERR;
705         if (crs->crs_eof)
706                 mask |= POLLHUP;
707         mutex_unlock(&crs->crs_lock);
708         return mask;
709 }
710
711 static long chlg_ioctl(struct file *file, unsigned int cmd, unsigned long flags)
712 {
713         int rc;
714
715         struct chlg_reader_state *crs = file->private_data;
716         switch (cmd) {
717         case OBD_IOC_CHLG_POLL:
718                 crs->crs_flags = flags;
719                 rc = 0;
720                 break;
721         default:
722                 rc = -EINVAL;
723                 break;
724         }
725         return rc;
726 }
727
728 static const struct file_operations chlg_fops = {
729         .owner          = THIS_MODULE,
730         .llseek         = chlg_llseek,
731         .read           = chlg_read,
732         .write          = chlg_write,
733         .open           = chlg_open,
734         .release        = chlg_release,
735         .poll           = chlg_poll,
736         .unlocked_ioctl = chlg_ioctl,
737 };
738
739 /**
740  * This uses obd_name of the form: "testfs-MDT0000-mdc-ffff88006501600"
741  * and returns a name of the form: "changelog-testfs-MDT0000".
742  */
743 static void get_target_name(char *name, size_t name_len, struct obd_device *obd)
744 {
745         int i;
746
747         snprintf(name, name_len, "%s", obd->obd_name);
748
749         /* Find the 2nd '-' from the end and truncate on it */
750         for (i = 0; i < 2; i++) {
751                 char *p = strrchr(name, '-');
752
753                 if (p == NULL)
754                         return;
755                 *p = '\0';
756         }
757 }
758
759 /**
760  * Find a changelog character device by name.
761  * All devices registered during MDC setup are listed in a global list with
762  * their names attached.
763  */
764 static struct chlg_registered_dev *
765 chlg_registered_dev_find_by_name(const char *name)
766 {
767         struct chlg_registered_dev *dit;
768
769         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
770         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
771                 if (strcmp(name, dit->ced_name) == 0)
772                         return dit;
773         return NULL;
774 }
775
776 /**
777  * Find chlg_registered_dev structure for a given OBD device.
778  * This is bad O(n^2) but for each filesystem:
779  *   - N is # of MDTs times # of mount points
780  *   - this only runs at shutdown
781  */
782 static struct chlg_registered_dev *
783 chlg_registered_dev_find_by_obd(const struct obd_device *obd)
784 {
785         struct chlg_registered_dev *dit;
786         struct obd_device *oit;
787
788         LASSERT(mutex_is_locked(&chlg_registered_dev_lock));
789         list_for_each_entry(dit, &chlg_registered_devices, ced_link)
790                 list_for_each_entry(oit, &dit->ced_obds,
791                                     u.cli.cl_chg_dev_linkage)
792                         if (oit == obd)
793                                 return dit;
794         return NULL;
795 }
796
797 /**
798  * Changelog character device initialization.
799  * Register a misc character device with a dynamic minor number, under a name
800  * of the form: 'changelog-fsname-MDTxxxx'. Reference this OBD device with it.
801  *
802  * @param[in] obd  This MDC obd_device.
803  * @return 0 on success, negated error code on failure.
804  */
805 int mdc_changelog_cdev_init(struct obd_device *obd)
806 {
807         struct chlg_registered_dev *exist;
808         struct chlg_registered_dev *entry;
809         int minor, rc;
810         ENTRY;
811
812         OBD_ALLOC_PTR(entry);
813         if (entry == NULL)
814                 RETURN(-ENOMEM);
815
816         get_target_name(entry->ced_name, sizeof(entry->ced_name), obd);
817
818         kref_init(&entry->ced_refs);
819         INIT_LIST_HEAD(&entry->ced_obds);
820         INIT_LIST_HEAD(&entry->ced_link);
821
822         mutex_lock(&chlg_registered_dev_lock);
823         exist = chlg_registered_dev_find_by_name(entry->ced_name);
824         if (exist != NULL) {
825                 kref_get(&exist->ced_refs);
826                 list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &exist->ced_obds);
827                 GOTO(out_unlock, rc = 0);
828         }
829
830         list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds);
831         list_add_tail(&entry->ced_link, &chlg_registered_devices);
832
833         rc = chlg_minor_alloc(&minor);
834         if (rc)
835                 GOTO(out_listrm, rc);
836
837         device_initialize(&entry->ced_device);
838         entry->ced_device.devt = MKDEV(MAJOR(mdc_changelog_dev), minor);
839         entry->ced_device.class = mdc_changelog_class;
840         entry->ced_device.release = chlg_device_release;
841         dev_set_drvdata(&entry->ced_device, entry);
842         rc = dev_set_name(&entry->ced_device, "%s-%s", MDC_CHANGELOG_DEV_NAME,
843                           entry->ced_name);
844         if (rc)
845                 GOTO(out_minor, rc);
846
847         /* Register new character device */
848         cdev_init(&entry->ced_cdev, &chlg_fops);
849         entry->ced_cdev.owner = THIS_MODULE;
850         rc = cdev_device_add(&entry->ced_cdev, &entry->ced_device);
851         if (rc)
852                 GOTO(out_device_name, rc);
853
854         entry = NULL;   /* prevent it from being freed below */
855         GOTO(out_unlock, rc = 0);
856
857 out_device_name:
858         kfree_const(entry->ced_device.kobj.name);
859
860 out_minor:
861         chlg_minor_free(minor);
862
863 out_listrm:
864         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
865         list_del(&entry->ced_link);
866
867 out_unlock:
868         mutex_unlock(&chlg_registered_dev_lock);
869         if (entry)
870                 OBD_FREE_PTR(entry);
871         RETURN(rc);
872 }
873
874 /**
875  * Release OBD, decrease reference count of the corresponding changelog device.
876  */
877 void mdc_changelog_cdev_finish(struct obd_device *obd)
878 {
879         struct chlg_registered_dev *dev;
880
881         ENTRY;
882         mutex_lock(&chlg_registered_dev_lock);
883         dev = chlg_registered_dev_find_by_obd(obd);
884         list_del_init(&obd->u.cli.cl_chg_dev_linkage);
885         kref_put(&dev->ced_refs, chlg_dev_clear);
886         mutex_unlock(&chlg_registered_dev_lock);
887         EXIT;
888 }