Whamcloud - gitweb
LU-1301 mgs: env and mgs_device to pass around
[fs/lustre-release.git] / lustre / mgs / mgs_nids.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_nids.c
37  *
38  * NID table management for lustre.
39  *
40  * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MGS
44 #define D_MGS D_CONFIG
45
46 #ifdef __KERNEL__
47 #include <linux/module.h>
48 #include <linux/pagemap.h>
49 #include <linux/fs.h>
50 #endif
51
52 #include <obd.h>
53 #include <obd_lov.h>
54 #include <obd_class.h>
55 #include <lustre_log.h>
56 #include <obd_ost.h>
57 #include <libcfs/list.h>
58 #include <linux/lvfs.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include <lustre_param.h>
62 #include "mgs_internal.h"
63
64 static unsigned int ir_timeout;
65
66 static int nidtbl_is_sane(struct mgs_nidtbl *tbl)
67 {
68         struct mgs_nidtbl_target *tgt;
69         int version = 0;
70
71         LASSERT(cfs_mutex_is_locked(&tbl->mn_lock));
72         cfs_list_for_each_entry(tgt, &tbl->mn_targets, mnt_list) {
73                 if (!tgt->mnt_version)
74                         continue;
75
76                 if (version >= tgt->mnt_version)
77                         return 0;
78
79                 version = tgt->mnt_version;
80         }
81         return 1;
82 }
83
84 /**
85  * Fetch nidtbl entries whose version are not less than @version
86  * nidtbl entries will be packed in @pages by @unit_size units - entries
87  * shouldn't cross unit boundaries.
88  */
89 static int mgs_nidtbl_read(struct obd_export *exp, struct mgs_nidtbl *tbl,
90                            struct mgs_config_res *res, cfs_page_t **pages,
91                            int nrpages, int units_total, int unit_size)
92 {
93         struct mgs_nidtbl_target *tgt;
94         struct mgs_nidtbl_entry  *entry;
95         struct mgs_nidtbl_entry  *last_in_unit = NULL;
96         struct mgs_target_info   *mti;
97         __u64 version = res->mcr_offset;
98         bool nobuf = false;
99         void *buf = NULL;
100         int bytes_in_unit = 0;
101         int units_in_page = 0;
102         int index = 0;
103         int rc = 0;
104         ENTRY;
105
106         /* make sure unit_size is power 2 */
107         LASSERT((unit_size & (unit_size - 1)) == 0);
108         LASSERT(nrpages << CFS_PAGE_SHIFT >= units_total * unit_size);
109
110         cfs_mutex_lock(&tbl->mn_lock);
111         LASSERT(nidtbl_is_sane(tbl));
112
113         /* no more entries ? */
114         if (version > tbl->mn_version) {
115                 version = tbl->mn_version;
116                 goto out;
117         }
118
119         /* iterate over all targets to compose a bitmap by the type of llog.
120          * If the llog is for MDTs, llog entries for OSTs will be returned;
121          * otherwise, it's for clients, then llog entries for both OSTs and
122          * MDTs will be returned.
123          */
124         cfs_list_for_each_entry(tgt, &tbl->mn_targets, mnt_list) {
125                 int entry_len = sizeof(*entry);
126
127                 if (tgt->mnt_version < version)
128                         continue;
129
130                 /* write target recover information */
131                 mti  = &tgt->mnt_mti;
132                 LASSERT(mti->mti_nid_count < MTI_NIDS_MAX);
133                 entry_len += mti->mti_nid_count * sizeof(lnet_nid_t);
134
135                 if (entry_len > unit_size) {
136                         CWARN("nidtbl: too large entry: entry length %d,"
137                               "unit size: %d\n", entry_len, unit_size);
138                         GOTO(out, rc = -EOVERFLOW);
139                 }
140
141                 if (bytes_in_unit < entry_len) {
142                         if (units_total == 0) {
143                                 nobuf = true;
144                                 break;
145                         }
146
147                         /* check if we need to consume remaining bytes. */
148                         if (last_in_unit != NULL && bytes_in_unit) {
149 #
150 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 6, 50, 0)
151                                 /* May need to swab back to update the length.*/
152                                 if (exp->exp_need_mne_swab)
153                                         lustre_swab_mgs_nidtbl_entry(last_in_unit);
154 #endif
155                                 last_in_unit->mne_length += bytes_in_unit;
156 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 6, 50, 0)
157                                 if (exp->exp_need_mne_swab)
158                                         lustre_swab_mgs_nidtbl_entry(last_in_unit);
159 #endif
160                                 rc  += bytes_in_unit;
161                                 buf += bytes_in_unit;
162                                 last_in_unit = NULL;
163                         }
164                         LASSERT((rc & (unit_size - 1)) == 0);
165
166                         if (units_in_page == 0) {
167                                 /* allocate a new page */
168                                 pages[index] = cfs_alloc_page(CFS_ALLOC_STD);
169                                 if (pages[index] == NULL) {
170                                         rc = -ENOMEM;
171                                         break;
172                                 }
173
174                                 /* destroy previous map */
175                                 if (index > 0)
176                                         cfs_kunmap(pages[index - 1]);
177
178                                 /* reassign buffer */
179                                 buf = cfs_kmap(pages[index]);
180                                 ++index;
181
182                                 units_in_page = CFS_PAGE_SIZE / unit_size;
183                                 LASSERT(units_in_page > 0);
184                         }
185
186                         /* allocate an unit */
187                         LASSERT(((long)buf & (unit_size - 1)) == 0);
188                         bytes_in_unit = unit_size;
189                         --units_in_page;
190                         --units_total;
191                 }
192
193                 /* fill in entry. */
194                 entry = (struct mgs_nidtbl_entry *)buf;
195                 entry->mne_version   = tgt->mnt_version;
196                 entry->mne_instance  = mti->mti_instance;
197                 entry->mne_index     = mti->mti_stripe_index;
198                 entry->mne_length    = entry_len;
199                 entry->mne_type      = tgt->mnt_type;
200                 entry->mne_nid_type  = 0;
201                 entry->mne_nid_size  = sizeof(lnet_nid_t);
202                 entry->mne_nid_count = mti->mti_nid_count;
203                 memcpy(entry->u.nids, mti->mti_nids,
204                        mti->mti_nid_count * sizeof(lnet_nid_t));
205
206 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 6, 50, 0)
207                 /* For LU-1644, swab entry for 2.2 clients. */
208                 if (exp->exp_need_mne_swab)
209                         lustre_swab_mgs_nidtbl_entry(entry);
210 #endif
211
212                 version = tgt->mnt_version;
213                 rc     += entry_len;
214                 buf    += entry_len;
215
216                 bytes_in_unit -= entry_len;
217                 last_in_unit   = entry;
218
219                 CDEBUG(D_MGS, "fsname %s, entry size %d, pages %d/%d/%d/%d.\n",
220                        tbl->mn_fsdb->fsdb_name, entry_len,
221                        bytes_in_unit, index, nrpages, units_total);
222         }
223         if (index > 0)
224                 cfs_kunmap(pages[index - 1]);
225 out:
226         LASSERT(version <= tbl->mn_version);
227         res->mcr_size = tbl->mn_version;
228         res->mcr_offset = nobuf ? version : tbl->mn_version;
229         cfs_mutex_unlock(&tbl->mn_lock);
230         LASSERT(ergo(version == 1, rc == 0)); /* get the log first time */
231
232         CDEBUG(D_MGS, "Read IR logs %s return with %d, version %llu\n",
233                tbl->mn_fsdb->fsdb_name, rc, version);
234         RETURN(rc);
235 }
236
237 static int nidtbl_update_version(const struct lu_env *env,
238                                  struct mgs_device *mgs,
239                                  struct mgs_nidtbl *tbl)
240 {
241         struct lvfs_run_ctxt saved;
242         struct file         *file = NULL;
243         char                 filename[sizeof(MGS_NIDTBL_DIR) + 9];
244         u64                  version;
245         loff_t               off = 0;
246         int                  rc;
247         struct obd_device   *obd = tbl->mn_fsdb->fsdb_obd;
248         ENTRY;
249
250         LASSERT(cfs_mutex_is_locked(&tbl->mn_lock));
251         LASSERT(sizeof(filename) < 32);
252
253         sprintf(filename, "%s/%s",
254                 MGS_NIDTBL_DIR, tbl->mn_fsdb->fsdb_name);
255
256         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
257
258         file = l_filp_open(filename, O_RDWR|O_CREAT, 0660);
259         if (!IS_ERR(file)) {
260                 version = cpu_to_le64(tbl->mn_version);
261                 rc = lustre_fwrite(file, &version, sizeof(version), &off);
262                 if (rc == sizeof(version))
263                         rc = 0;
264                 filp_close(file, 0);
265                 fsfilt_sync(obd, obd->u.mgs.mgs_sb);
266         } else {
267                 rc = PTR_ERR(file);
268         }
269
270         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
271         RETURN(rc);
272 }
273
274 #define MGS_NIDTBL_VERSION_INIT 2
275
276 static int nidtbl_read_version(const struct lu_env *env,
277                                struct mgs_device *mgs, struct mgs_nidtbl *tbl)
278 {
279         struct lvfs_run_ctxt saved;
280         struct file         *file = NULL;
281         char                 filename[sizeof(MGS_NIDTBL_DIR) + 9];
282         u64                  version;
283         loff_t               off = 0;
284         int                  rc;
285         struct obd_device   *obd = tbl->mn_fsdb->fsdb_obd;
286         ENTRY;
287
288         LASSERT(cfs_mutex_is_locked(&tbl->mn_lock));
289         LASSERT(sizeof(filename) < 32);
290
291         sprintf(filename, "%s/%s",
292                 MGS_NIDTBL_DIR, tbl->mn_fsdb->fsdb_name);
293
294         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
295
296         file = l_filp_open(filename, O_RDONLY, 0);
297         if (!IS_ERR(file)) {
298                 rc = lustre_fread(file, &version, sizeof(version), &off);
299                 if (rc == sizeof(version))
300                         rc = cpu_to_le64(version);
301                 else if (rc == 0)
302                         rc = MGS_NIDTBL_VERSION_INIT;
303                 else
304                         CERROR("read version file %s error %d\n", filename, rc);
305                 filp_close(file, 0);
306         } else {
307                 rc = PTR_ERR(file);
308                 if (rc == -ENOENT)
309                         rc = MGS_NIDTBL_VERSION_INIT;
310         }
311
312         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
313         RETURN(rc);
314 }
315
316 static int mgs_nidtbl_write(const struct lu_env *env, struct fs_db *fsdb,
317                             struct mgs_target_info *mti)
318 {
319         struct mgs_nidtbl        *tbl;
320         struct mgs_nidtbl_target *tgt;
321         bool found = false;
322         int type   = mti->mti_flags & LDD_F_SV_TYPE_MASK;
323         int rc     = 0;
324         ENTRY;
325
326         type &= ~LDD_F_SV_TYPE_MGS;
327         LASSERT(type != 0);
328
329         tbl = &fsdb->fsdb_nidtbl;
330         cfs_mutex_lock(&tbl->mn_lock);
331         cfs_list_for_each_entry(tgt, &tbl->mn_targets, mnt_list) {
332                 struct mgs_target_info *info = &tgt->mnt_mti;
333                 if (type == tgt->mnt_type &&
334                     mti->mti_stripe_index == info->mti_stripe_index) {
335                         found = true;
336                         break;
337                 }
338         }
339         if (!found) {
340                 OBD_ALLOC_PTR(tgt);
341                 if (tgt == NULL)
342                         GOTO(out, rc = -ENOMEM);
343
344                 CFS_INIT_LIST_HEAD(&tgt->mnt_list);
345                 tgt->mnt_fs      = tbl;
346                 tgt->mnt_version = 0;       /* 0 means invalid */
347                 tgt->mnt_type    = type;
348
349                 ++tbl->mn_nr_targets;
350         }
351
352         tgt->mnt_version = ++tbl->mn_version;
353         tgt->mnt_mti     = *mti;
354
355         cfs_list_move_tail(&tgt->mnt_list, &tbl->mn_targets);
356
357         rc = nidtbl_update_version(env, fsdb->fsdb_mgs, tbl);
358         EXIT;
359
360 out:
361         cfs_mutex_unlock(&tbl->mn_lock);
362         if (rc)
363                 CERROR("Write NID table version for file system %s error %d\n",
364                        fsdb->fsdb_name, rc);
365         return rc;
366 }
367
368 static void mgs_nidtbl_fini_fs(struct fs_db *fsdb)
369 {
370         struct mgs_nidtbl *tbl = &fsdb->fsdb_nidtbl;
371         CFS_LIST_HEAD(head);
372
373         cfs_mutex_lock(&tbl->mn_lock);
374         tbl->mn_nr_targets = 0;
375         cfs_list_splice_init(&tbl->mn_targets, &head);
376         cfs_mutex_unlock(&tbl->mn_lock);
377
378         while (!cfs_list_empty(&head)) {
379                 struct mgs_nidtbl_target *tgt;
380                 tgt = list_entry(head.next, struct mgs_nidtbl_target, mnt_list);
381                 cfs_list_del(&tgt->mnt_list);
382                 OBD_FREE_PTR(tgt);
383         }
384 }
385
386 static int mgs_nidtbl_init_fs(const struct lu_env *env, struct fs_db *fsdb)
387 {
388         struct mgs_nidtbl *tbl = &fsdb->fsdb_nidtbl;
389
390         CFS_INIT_LIST_HEAD(&tbl->mn_targets);
391         cfs_mutex_init(&tbl->mn_lock);
392         tbl->mn_nr_targets = 0;
393         tbl->mn_fsdb = fsdb;
394         cfs_mutex_lock(&tbl->mn_lock);
395         tbl->mn_version = nidtbl_read_version(env, fsdb->fsdb_mgs, tbl);
396         cfs_mutex_unlock(&tbl->mn_lock);
397         CDEBUG(D_MGS, "IR: current version is %llu\n", tbl->mn_version);
398
399         return 0;
400 }
401
402 /* --------- Imperative Recovery relies on nidtbl stuff ------- */
403 void mgs_ir_notify_complete(struct fs_db *fsdb)
404 {
405         struct timeval tv;
406         cfs_duration_t delta;
407
408         cfs_atomic_set(&fsdb->fsdb_notify_phase, 0);
409
410         /* do statistic */
411         fsdb->fsdb_notify_count++;
412         delta = cfs_time_sub(cfs_time_current(), fsdb->fsdb_notify_start);
413         fsdb->fsdb_notify_total += delta;
414         if (delta > fsdb->fsdb_notify_max)
415                 fsdb->fsdb_notify_max = delta;
416
417         cfs_duration_usec(delta, &tv);
418         CDEBUG(D_MGS, "Revoke recover lock of %s completed after %ld.%06lds\n",
419                fsdb->fsdb_name, tv.tv_sec, tv.tv_usec);
420 }
421
422 static int mgs_ir_notify(void *arg)
423 {
424         struct fs_db      *fsdb   = arg;
425         struct ldlm_res_id resid;
426
427         char name[sizeof(fsdb->fsdb_name) + 20];
428
429         LASSERTF(sizeof(name) < 32, "name is too large to be in stack.\n");
430         sprintf(name, "mgs_%s_notify", fsdb->fsdb_name);
431         cfs_daemonize(name);
432
433         cfs_complete(&fsdb->fsdb_notify_comp);
434
435         set_user_nice(current, -2);
436
437         mgc_fsname2resid(fsdb->fsdb_name, &resid, CONFIG_T_RECOVER);
438         while (1) {
439                 struct l_wait_info   lwi = { 0 };
440
441                 l_wait_event(fsdb->fsdb_notify_waitq,
442                              fsdb->fsdb_notify_stop ||
443                              cfs_atomic_read(&fsdb->fsdb_notify_phase),
444                              &lwi);
445                 if (fsdb->fsdb_notify_stop)
446                         break;
447
448                 CDEBUG(D_MGS, "%s woken up, phase is %d\n",
449                        name, cfs_atomic_read(&fsdb->fsdb_notify_phase));
450
451                 fsdb->fsdb_notify_start = cfs_time_current();
452                 mgs_revoke_lock(fsdb->fsdb_mgs, fsdb, CONFIG_T_RECOVER);
453         }
454
455         cfs_complete(&fsdb->fsdb_notify_comp);
456         return 0;
457 }
458
459 int mgs_ir_init_fs(const struct lu_env *env, struct mgs_device *mgs,
460                    struct fs_db *fsdb)
461 {
462         int rc;
463
464         if (!ir_timeout)
465                 ir_timeout = OBD_IR_MGS_TIMEOUT;
466
467         fsdb->fsdb_ir_state = IR_FULL;
468         if (cfs_time_before(cfs_time_current_sec(),
469                             mgs->mgs_start_time + ir_timeout))
470                 fsdb->fsdb_ir_state = IR_STARTUP;
471         fsdb->fsdb_nonir_clients = 0;
472         CFS_INIT_LIST_HEAD(&fsdb->fsdb_clients);
473
474         /* start notify thread */
475         fsdb->fsdb_obd = mgs->mgs_obd;
476         fsdb->fsdb_mgs = mgs;
477         cfs_atomic_set(&fsdb->fsdb_notify_phase, 0);
478         cfs_waitq_init(&fsdb->fsdb_notify_waitq);
479         cfs_init_completion(&fsdb->fsdb_notify_comp);
480         rc = cfs_create_thread(mgs_ir_notify, fsdb, CFS_DAEMON_FLAGS);
481         if (rc > 0)
482                 cfs_wait_for_completion(&fsdb->fsdb_notify_comp);
483         else
484                 CERROR("Start notify thread error %d\n", rc);
485
486         mgs_nidtbl_init_fs(env, fsdb);
487         return 0;
488 }
489
490 void mgs_ir_fini_fs(struct mgs_device *mgs, struct fs_db *fsdb)
491 {
492         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags))
493                 return;
494
495         mgs_fsc_cleanup_by_fsdb(fsdb);
496
497         mgs_nidtbl_fini_fs(fsdb);
498
499         LASSERT(cfs_list_empty(&fsdb->fsdb_clients));
500
501         fsdb->fsdb_notify_stop = 1;
502         cfs_waitq_signal(&fsdb->fsdb_notify_waitq);
503         cfs_wait_for_completion(&fsdb->fsdb_notify_comp);
504 }
505
506 /* caller must have held fsdb_mutex */
507 static inline void ir_state_graduate(struct fs_db *fsdb)
508 {
509         if (fsdb->fsdb_ir_state == IR_STARTUP) {
510                 if (cfs_time_before(fsdb->fsdb_mgs->mgs_start_time + ir_timeout,
511                                     cfs_time_current_sec())) {
512                         fsdb->fsdb_ir_state = IR_FULL;
513                         if (fsdb->fsdb_nonir_clients)
514                                 fsdb->fsdb_ir_state = IR_PARTIAL;
515                 }
516         }
517 }
518
519 int mgs_ir_update(const struct lu_env *env, struct mgs_device *mgs,
520                   struct mgs_target_info *mti)
521 {
522         struct fs_db *fsdb;
523         bool notify = true;
524         int rc;
525
526         if (mti->mti_instance == 0)
527                 return -EINVAL;
528
529         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
530         if (rc)
531                 return rc;
532
533         rc = mgs_nidtbl_write(env, fsdb, mti);
534         if (rc)
535                 return rc;
536
537         /* check ir state */
538         cfs_mutex_lock(&fsdb->fsdb_mutex);
539         ir_state_graduate(fsdb);
540         switch (fsdb->fsdb_ir_state) {
541         case IR_FULL:
542                 mti->mti_flags |= LDD_F_IR_CAPABLE;
543                 break;
544         case IR_DISABLED:
545                 notify = false;
546         case IR_STARTUP:
547         case IR_PARTIAL:
548                 break;
549         default:
550                 LBUG();
551         }
552         cfs_mutex_unlock(&fsdb->fsdb_mutex);
553
554         LASSERT(ergo(mti->mti_flags & LDD_F_IR_CAPABLE, notify));
555         if (notify) {
556                 CDEBUG(D_MGS, "Try to revoke recover lock of %s\n",
557                        fsdb->fsdb_name);
558                 cfs_atomic_inc(&fsdb->fsdb_notify_phase);
559                 cfs_waitq_signal(&fsdb->fsdb_notify_waitq);
560         }
561         return 0;
562 }
563
564 /* NID table can be cached by two entities: Clients and MDTs */
565 enum {
566         IR_CLIENT  = 1,
567         IR_MDT     = 2
568 };
569
570 static int delogname(char *logname, char *fsname, int *typ)
571 {
572         char *ptr;
573         int   type;
574         int   len;
575
576         ptr = strrchr(logname, '-');
577         if (ptr == NULL)
578                 return -EINVAL;
579
580         /* decouple file system name. The llog name may be:
581          * - "prefix-fsname", prefix is "cliir" or "mdtir"
582          */
583         if (strncmp(ptr, "-mdtir", 6) == 0)
584                 type = IR_MDT;
585         else if (strncmp(ptr, "-cliir", 6) == 0)
586                 type = IR_CLIENT;
587         else
588                 return -EINVAL;
589
590         len = ptr - logname;
591         if (len == 0)
592                 return -EINVAL;
593
594         memcpy(fsname, logname, len);
595         fsname[len] = 0;
596         if (typ)
597                 *typ = type;
598         return 0;
599 }
600
601 int mgs_get_ir_logs(struct ptlrpc_request *req)
602 {
603         struct lu_env     *env = req->rq_svc_thread->t_env;
604         struct mgs_device *mgs = exp2mgs_dev(req->rq_export);
605         struct fs_db      *fsdb;
606         struct mgs_config_body  *body;
607         struct mgs_config_res   *res;
608         struct ptlrpc_bulk_desc *desc;
609         struct l_wait_info lwi;
610         char               fsname[16];
611         long               bufsize;
612         int                unit_size;
613         int                type;
614         int                rc = 0;
615         int                i;
616         int                bytes;
617         int                page_count;
618         int                nrpages;
619         cfs_page_t       **pages = NULL;
620         ENTRY;
621
622         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
623         if (body == NULL)
624                 RETURN(-EINVAL);
625
626         if (body->mcb_type != CONFIG_T_RECOVER)
627                 RETURN(-EINVAL);
628
629         rc = delogname(body->mcb_name, fsname, &type);
630         if (rc)
631                 RETURN(rc);
632
633         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
634         if (rc)
635                 RETURN(rc);
636
637         bufsize = body->mcb_units << body->mcb_bits;
638         nrpages = (bufsize + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
639         if (nrpages > PTLRPC_MAX_BRW_PAGES)
640                 RETURN(-EINVAL);
641
642         CDEBUG(D_MGS, "Reading IR log %s bufsize %ld.\n",
643                body->mcb_name, bufsize);
644
645         OBD_ALLOC(pages, sizeof(*pages) * nrpages);
646         if (pages == NULL)
647                 RETURN(-ENOMEM);
648
649         rc = req_capsule_server_pack(&req->rq_pill);
650         if (rc)
651                 GOTO(out, rc);
652
653         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
654         if (res == NULL)
655                 GOTO(out, rc = -EINVAL);
656
657         res->mcr_offset = body->mcb_offset;
658         unit_size = min_t(int, 1 << body->mcb_bits, CFS_PAGE_SIZE);
659         bytes = mgs_nidtbl_read(req->rq_export, &fsdb->fsdb_nidtbl, res,
660                                 pages, nrpages, bufsize / unit_size, unit_size);
661         if (bytes < 0)
662                 GOTO(out, rc = bytes);
663
664         /* start bulk transfer */
665         page_count = (bytes + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
666         LASSERT(page_count <= nrpages);
667         desc = ptlrpc_prep_bulk_exp(req, page_count,
668                                     BULK_PUT_SOURCE, MGS_BULK_PORTAL);
669         if (desc == NULL)
670                 GOTO(out, rc = -ENOMEM);
671
672         for (i = 0; i < page_count && bytes > 0; i++) {
673                 ptlrpc_prep_bulk_page(desc, pages[i], 0,
674                                 min_t(int, bytes, CFS_PAGE_SIZE));
675                 bytes -= CFS_PAGE_SIZE;
676         }
677
678         rc = target_bulk_io(req->rq_export, desc, &lwi);
679         ptlrpc_free_bulk(desc);
680
681 out:
682         if (pages) {
683                 for (i = 0; i < nrpages; i++) {
684                         if (pages[i] == NULL)
685                                 break;
686                         cfs_free_page(pages[i]);
687                 }
688                 OBD_FREE(pages, sizeof(*pages) * nrpages);
689         }
690         return rc;
691 }
692
693 static int lprocfs_ir_set_state(struct fs_db *fsdb, const char *buf)
694 {
695         const char *strings[] = IR_STRINGS;
696         int         state = -1;
697         int         i;
698
699         for (i = 0; i < ARRAY_SIZE(strings); i++) {
700                 if (strcmp(strings[i], buf) == 0) {
701                         state = i;
702                         break;
703                 }
704         }
705         if (state < 0)
706                 return -EINVAL;
707
708         CDEBUG(D_MGS, "change fsr state of %s from %s to %s\n",
709                fsdb->fsdb_name, strings[fsdb->fsdb_ir_state], strings[state]);
710         cfs_mutex_lock(&fsdb->fsdb_mutex);
711         if (state == IR_FULL && fsdb->fsdb_nonir_clients)
712                 state = IR_PARTIAL;
713         fsdb->fsdb_ir_state = state;
714         cfs_mutex_unlock(&fsdb->fsdb_mutex);
715
716         return 0;
717 }
718
719 static int lprocfs_ir_set_timeout(struct fs_db *fsdb, const char *buf)
720 {
721         return -EINVAL;
722 }
723
724 static int lprocfs_ir_clear_stats(struct fs_db *fsdb, const char *buf)
725 {
726         if (*buf)
727                 return -EINVAL;
728
729         fsdb->fsdb_notify_total = 0;
730         fsdb->fsdb_notify_max   = 0;
731         fsdb->fsdb_notify_count = 0;
732         return 0;
733 }
734
735 static struct lproc_ir_cmd {
736         char *name;
737         int   namelen;
738         int (*handler)(struct fs_db *, const char *);
739 } ir_cmds[] = {
740         { "state=",   6, lprocfs_ir_set_state },
741         { "timeout=", 8, lprocfs_ir_set_timeout },
742         { "0",        1, lprocfs_ir_clear_stats }
743 };
744
745 int lprocfs_wr_ir_state(struct file *file, const char *buffer,
746                          unsigned long count, void *data)
747 {
748         struct fs_db *fsdb = data;
749         char *kbuf;
750         char *ptr;
751         int rc = 0;
752
753         if (count > CFS_PAGE_SIZE)
754                 return -EINVAL;
755
756         OBD_ALLOC(kbuf, count + 1);
757         if (kbuf == NULL)
758                 return -ENOMEM;
759
760         if (copy_from_user(kbuf, buffer, count)) {
761                 OBD_FREE(kbuf, count);
762                 return -EFAULT;
763         }
764
765         kbuf[count] = 0; /* buffer is supposed to end with 0 */
766         if (kbuf[count - 1] == '\n')
767                 kbuf[count - 1] = 0;
768         ptr = kbuf;
769
770         /* fsname=<file system name> must be the 1st entry */
771         while (ptr != NULL) {
772                 char *tmpptr;
773                 int i;
774
775                 tmpptr = strchr(ptr, ';');
776                 if (tmpptr)
777                         *tmpptr++ = 0;
778
779                 rc = -EINVAL;
780                 for (i = 0; i < ARRAY_SIZE(ir_cmds); i++) {
781                         struct lproc_ir_cmd *cmd;
782                         int cmdlen;
783
784                         cmd    = &ir_cmds[i];
785                         cmdlen = cmd->namelen;
786                         if (strncmp(cmd->name, ptr, cmdlen) == 0) {
787                                 ptr += cmdlen;
788                                 rc = cmd->handler(fsdb, ptr);
789                                 break;
790                         }
791                 }
792                 if (rc)
793                         break;
794
795                 ptr = tmpptr;
796         }
797         if (rc)
798                 CERROR("Unable to process command: %s(%d)\n", ptr, rc);
799         OBD_FREE(kbuf, count + 1);
800         return rc ?: count;
801 }
802
803 int lprocfs_rd_ir_state(struct seq_file *seq, void *data)
804 {
805         struct fs_db      *fsdb = data;
806         struct mgs_nidtbl *tbl  = &fsdb->fsdb_nidtbl;
807         const char        *ir_strings[] = IR_STRINGS;
808         struct timeval     tv_max;
809         struct timeval     tv;
810
811         /* mgs_live_seq_show() already holds fsdb_mutex. */
812         ir_state_graduate(fsdb);
813
814         seq_printf(seq, "\nimperative_recovery_state:\n");
815         seq_printf(seq,
816                    "    state: %s\n"
817                    "    nonir_clients: %d\n"
818                    "    nidtbl_version: %lld\n",
819                    ir_strings[fsdb->fsdb_ir_state], fsdb->fsdb_nonir_clients,
820                    tbl->mn_version);
821
822         cfs_duration_usec(fsdb->fsdb_notify_total, &tv);
823         cfs_duration_usec(fsdb->fsdb_notify_max, &tv_max);
824
825         seq_printf(seq, "    notify_duration_total: %lu.%06lu\n"
826                         "    notify_duation_max: %lu.%06lu\n"
827                         "    notify_count: %u\n",
828                    tv.tv_sec, tv.tv_usec,
829                    tv_max.tv_sec, tv_max.tv_usec,
830                    fsdb->fsdb_notify_count);
831
832         return 0;
833 }
834
835 int lprocfs_rd_ir_timeout(char *page, char **start, off_t off, int count,
836                           int *eof, void *data)
837 {
838         *eof = 1;
839         return snprintf(page, count, "%d\n", ir_timeout);
840 }
841
842 int lprocfs_wr_ir_timeout(struct file *file, const char *buffer,
843                           unsigned long count, void *data)
844 {
845         return lprocfs_wr_uint(file, buffer, count, &ir_timeout);
846 }
847
848 /* --------------- Handle non IR support clients --------------- */
849 /* attach a lustre file system to an export */
850 int mgs_fsc_attach(const struct lu_env *env, struct obd_export *exp,
851                    char *fsname)
852 {
853         struct mgs_export_data *data = &exp->u.eu_mgs_data;
854         struct mgs_device *mgs = exp2mgs_dev(exp);
855         struct fs_db      *fsdb;
856         struct mgs_fsc    *fsc     = NULL;
857         struct mgs_fsc    *new_fsc = NULL;
858         bool               found   = false;
859         int                rc;
860         ENTRY;
861
862         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
863         if (rc)
864                 RETURN(rc);
865
866         /* allocate a new fsc in case we need it in spinlock. */
867         OBD_ALLOC_PTR(new_fsc);
868         if (new_fsc == NULL)
869                 RETURN(-ENOMEM);
870
871         CFS_INIT_LIST_HEAD(&new_fsc->mfc_export_list);
872         CFS_INIT_LIST_HEAD(&new_fsc->mfc_fsdb_list);
873         new_fsc->mfc_fsdb       = fsdb;
874         new_fsc->mfc_export     = class_export_get(exp);
875         new_fsc->mfc_ir_capable =
876                         !!(exp->exp_connect_flags & OBD_CONNECT_IMP_RECOV);
877
878         rc = -EEXIST;
879         cfs_mutex_lock(&fsdb->fsdb_mutex);
880
881         /* tend to find it in export list because this list is shorter. */
882         cfs_spin_lock(&data->med_lock);
883         cfs_list_for_each_entry(fsc, &data->med_clients, mfc_export_list) {
884                 if (strcmp(fsname, fsc->mfc_fsdb->fsdb_name) == 0) {
885                         found = true;
886                         break;
887                 }
888         }
889         if (!found) {
890                 fsc = new_fsc;
891                 new_fsc = NULL;
892
893                 /* add it into export list. */
894                 cfs_list_add(&fsc->mfc_export_list, &data->med_clients);
895
896                 /* add into fsdb list. */
897                 cfs_list_add(&fsc->mfc_fsdb_list, &fsdb->fsdb_clients);
898                 if (!fsc->mfc_ir_capable) {
899                         ++fsdb->fsdb_nonir_clients;
900                         if (fsdb->fsdb_ir_state == IR_FULL)
901                                 fsdb->fsdb_ir_state = IR_PARTIAL;
902                 }
903                 rc = 0;
904         }
905         cfs_spin_unlock(&data->med_lock);
906         cfs_mutex_unlock(&fsdb->fsdb_mutex);
907
908         if (new_fsc) {
909                 class_export_put(new_fsc->mfc_export);
910                 OBD_FREE_PTR(new_fsc);
911         }
912         RETURN(rc);
913 }
914
915 void mgs_fsc_cleanup(struct obd_export *exp)
916 {
917         struct mgs_export_data *data = &exp->u.eu_mgs_data;
918         struct mgs_fsc *fsc, *tmp;
919         CFS_LIST_HEAD(head);
920
921         cfs_spin_lock(&data->med_lock);
922         cfs_list_splice_init(&data->med_clients, &head);
923         cfs_spin_unlock(&data->med_lock);
924
925         cfs_list_for_each_entry_safe(fsc, tmp, &head, mfc_export_list) {
926                 struct fs_db *fsdb = fsc->mfc_fsdb;
927
928                 LASSERT(fsc->mfc_export == exp);
929
930                 cfs_mutex_lock(&fsdb->fsdb_mutex);
931                 cfs_list_del_init(&fsc->mfc_fsdb_list);
932                 if (fsc->mfc_ir_capable == 0) {
933                         --fsdb->fsdb_nonir_clients;
934                         LASSERT(fsdb->fsdb_ir_state != IR_FULL);
935                         if (fsdb->fsdb_nonir_clients == 0 &&
936                             fsdb->fsdb_ir_state == IR_PARTIAL)
937                                 fsdb->fsdb_ir_state = IR_FULL;
938                 }
939                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
940                 cfs_list_del_init(&fsc->mfc_export_list);
941                 class_export_put(fsc->mfc_export);
942                 OBD_FREE_PTR(fsc);
943         }
944 }
945
946 /* must be called with fsdb->fsdb_mutex held */
947 void mgs_fsc_cleanup_by_fsdb(struct fs_db *fsdb)
948 {
949         struct mgs_fsc *fsc, *tmp;
950
951         cfs_list_for_each_entry_safe(fsc, tmp, &fsdb->fsdb_clients,
952                                      mfc_fsdb_list) {
953                 struct mgs_export_data *data = &fsc->mfc_export->u.eu_mgs_data;
954
955                 LASSERT(fsdb == fsc->mfc_fsdb);
956                 cfs_list_del_init(&fsc->mfc_fsdb_list);
957
958                 cfs_spin_lock(&data->med_lock);
959                 cfs_list_del_init(&fsc->mfc_export_list);
960                 cfs_spin_unlock(&data->med_lock);
961                 class_export_put(fsc->mfc_export);
962                 OBD_FREE_PTR(fsc);
963         }
964
965         fsdb->fsdb_nonir_clients = 0;
966         if (fsdb->fsdb_ir_state == IR_PARTIAL)
967                 fsdb->fsdb_ir_state = IR_FULL;
968 }