Whamcloud - gitweb
LU-13550 osd-zfs: snapshot with incompatible clients
[fs/lustre-release.git] / lustre / mgs / mgs_barrier.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  *
25  * lustre/mgs/mgs_barrier.c
26  *
27  * Author: Fan, Yong <fan.yong@intel.com>
28  */
29
30 #define DEBUG_SUBSYSTEM S_MGS
31 #define D_MGS D_CONFIG
32
33 #include <uapi/linux/lustre/lustre_ioctl.h>
34 #include <lustre_swab.h>
35 #include <uapi/linux/lustre/lustre_barrier_user.h>
36
37 #include "mgs_internal.h"
38
39 /**
40  * Handle the barrier lock glimpse reply.
41  *
42  * The barrier lock glimpse reply contains the target MDT's index and
43  * the barrier operation status on such MDT. With such infomation. If
44  * the MDT given barrier status is the expected one, then set related
45  * 'fsdb''s barrier bitmap; otherwise record the failure or status.
46  *
47  * \param[in] env       pointer to the thread context
48  * \param[in] req       pointer to the glimpse callback RPC request
49  * \param[in] data      pointer the async glimpse callback data
50  * \param[in] rc        the glimpse callback RPC return value
51  *
52  * \retval              0 for success
53  * \retval              negative error number on failure
54  */
55 static int mgs_barrier_gl_interpret_reply(const struct lu_env *env,
56                                           struct ptlrpc_request *req,
57                                           void *data, int rc)
58 {
59         struct ldlm_cb_async_args *ca = data;
60         struct fs_db *fsdb = ca->ca_set_arg->gl_interpret_data;
61         struct barrier_lvb *lvb;
62         ENTRY;
63
64         if (rc) {
65                 if (rc == -ENODEV) {
66                         /* The lock is useless, cancel it. */
67                         ldlm_lock_cancel(ca->ca_lock);
68                         rc = 0;
69                 }
70
71                 GOTO(out, rc);
72         }
73
74         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
75                                           lustre_swab_barrier_lvb);
76         if (!lvb)
77                 GOTO(out, rc = -EPROTO);
78
79         if (lvb->lvb_status == fsdb->fsdb_barrier_expected) {
80                 if (unlikely(lvb->lvb_index > INDEX_MAP_SIZE))
81                         rc = -EINVAL;
82                 else
83                         set_bit(lvb->lvb_index, fsdb->fsdb_barrier_map);
84         } else if (likely(!test_bit(lvb->lvb_index, fsdb->fsdb_barrier_map))) {
85                 fsdb->fsdb_barrier_result = lvb->lvb_status;
86         }
87
88         GOTO(out, rc);
89
90 out:
91         if (rc)
92                 fsdb->fsdb_barrier_result = rc;
93
94         return rc;
95 }
96
97 /**
98  * Send glimpse callback to the barrier locks holders.
99  *
100  * The glimpse callback takes the current barrier status. The barrier locks
101  * holders (on the MDTs) will take related barrier actions according to the
102  * given barrier status, then return their local barrier status.
103  *
104  * \param[in] env       pointer to the thread context
105  * \param[in] mgs       pointer to the MGS device
106  * \param[in] fsdb      pointer the barrier 'fsdb'
107  * \param[in] timeout   indicate when the barrier will be expired
108  * \param[in] expected  the expected barrier status on remote servers (MDTs)
109  *
110  * \retval              positive number for unexpected barrier status
111  * \retval              0 for success
112  * \retval              negative error number on failure
113  */
114 static int mgs_barrier_glimpse_lock(const struct lu_env *env,
115                                     struct mgs_device *mgs,
116                                     struct fs_db *fsdb,
117                                     __u32 timeout, __u32 expected)
118 {
119         union ldlm_gl_desc *desc = &mgs_env_info(env)->mgi_gl_desc;
120         struct ldlm_res_id res_id;
121         struct ldlm_resource *res;
122         struct ldlm_glimpse_work *work;
123         struct ldlm_glimpse_work *tmp;
124         LIST_HEAD(gl_list);
125         struct list_head *pos;
126         int i;
127         int rc;
128         ENTRY;
129
130         LASSERT(fsdb->fsdb_mdt_count > 0);
131
132         rc = mgc_logname2resid(fsdb->fsdb_name, &res_id, MGS_CFG_T_BARRIER);
133         if (rc)
134                 RETURN(rc);
135
136         res = ldlm_resource_get(mgs->mgs_obd->obd_namespace, NULL, &res_id,
137                                 LDLM_PLAIN, 0);
138         if (IS_ERR(res))
139                 RETURN(PTR_ERR(res));
140
141         fsdb->fsdb_barrier_result = 0;
142         fsdb->fsdb_barrier_expected = expected;
143         desc->barrier_desc.lgbd_status = fsdb->fsdb_barrier_status;
144         desc->barrier_desc.lgbd_timeout = timeout;
145
146 again:
147         list_for_each_entry(work, &gl_list, gl_list) {
148                 if (!work->gl_lock)
149                         break;
150
151                 LDLM_LOCK_RELEASE(work->gl_lock);
152                 work->gl_lock = NULL;
153         }
154
155         /* It is not big issue to alloc more work item than needed. */
156         for (i = 0; i < fsdb->fsdb_mdt_count; i++) {
157                 OBD_ALLOC_PTR(work);
158                 if (!work)
159                         GOTO(out, rc = -ENOMEM);
160
161                 list_add_tail(&work->gl_list, &gl_list);
162         }
163
164         work = list_entry(gl_list.next, struct ldlm_glimpse_work, gl_list);
165
166         lock_res(res);
167         list_for_each(pos, &res->lr_granted) {
168                 struct ldlm_lock *lock = list_entry(pos, struct ldlm_lock,
169                                                     l_res_link);
170
171                 work->gl_lock = LDLM_LOCK_GET(lock);
172                 work->gl_flags = 0;
173                 work->gl_desc = desc;
174                 work->gl_interpret_reply = mgs_barrier_gl_interpret_reply;
175                 work->gl_interpret_data = fsdb;
176
177                 if (unlikely(work->gl_list.next == &gl_list)) {
178                         if (likely(pos->next == &res->lr_granted))
179                                 break;
180
181                         unlock_res(res);
182                         /* The granted locks are more than the MDTs count. */
183                         goto again;
184                 }
185
186                 work = list_entry(work->gl_list.next, struct ldlm_glimpse_work,
187                                   gl_list);
188         }
189         unlock_res(res);
190
191         /* The MDTs count may be more than the granted locks. */
192         list_for_each_entry_safe_reverse(work, tmp, &gl_list, gl_list) {
193                 if (work->gl_lock)
194                         break;
195
196                 list_del(&work->gl_list);
197                 OBD_FREE_PTR(work);
198         }
199
200         if (!list_empty(&gl_list))
201                 rc = ldlm_glimpse_locks(res, &gl_list);
202         else
203                 rc = -ENODEV;
204
205         GOTO(out, rc);
206
207 out:
208         list_for_each_entry_safe(work, tmp, &gl_list, gl_list) {
209                 list_del(&work->gl_list);
210                 if (work->gl_lock)
211                         LDLM_LOCK_RELEASE(work->gl_lock);
212                 OBD_FREE_PTR(work);
213         }
214
215         ldlm_resource_putref(res);
216         if (!rc)
217                 rc = fsdb->fsdb_barrier_result;
218
219         return rc;
220 }
221
222 static void mgs_barrier_bitmap_setup(struct mgs_device *mgs,
223                                      struct fs_db *b_fsdb,
224                                      const char *name)
225 {
226         struct fs_db *c_fsdb;
227
228         c_fsdb = mgs_find_fsdb(mgs, name);
229         if (likely(c_fsdb)) {
230                 memcpy(b_fsdb->fsdb_mdt_index_map,
231                        c_fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
232                 b_fsdb->fsdb_mdt_count = c_fsdb->fsdb_mdt_count;
233                 mgs_put_fsdb(mgs, c_fsdb);
234         }
235 }
236
237 static bool mgs_barrier_done(struct fs_db *fsdb)
238 {
239         int i;
240
241         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
242                 if (test_bit(i, fsdb->fsdb_mdt_index_map) &&
243                     !test_bit(i, fsdb->fsdb_barrier_map))
244                         return false;
245         }
246
247         return true;
248 }
249
250 bool mgs_barrier_expired(struct fs_db *fsdb, time64_t timeout)
251 {
252         time64_t expired = fsdb->fsdb_barrier_latest_create_time + timeout;
253
254         return expired > ktime_get_real_seconds();
255 }
256
257 static inline bool mgs_barrier_tests_disabled(struct barrier_ctl *bc,
258                                               struct obd_export *exp,
259                                               struct fs_db *fsdb)
260 {
261         __u64 flags = exp_connect_flags(exp);
262
263         if ((flags & OBD_CONNECT_MDS_MDS) && !(flags & OBD_CONNECT_BARRIER)) {
264                 fsdb->fsdb_barrier_disabled = 1;
265                 LCONSOLE_WARN("%s: Barrier incompatible connection %s\n",
266                               bc->bc_name,
267                               obd_uuid2str(&exp->exp_client_uuid));
268                 return true;
269         }
270         return false;
271 }
272
273 /**
274  * Create the barrier for the given instance.
275  *
276  * We use two-phases barrier to guarantee that after the barrier setup:
277  * 1) All the server side pending async modification RPCs have been flushed.
278  * 2) Any subsequent modification will be blocked.
279  * 3) All async transactions on the MDTs have been committed.
280  *
281  * For phase1, we do the following:
282  *
283  * Firstly, it sets barrier flag on the instance that will block subsequent
284  * modifications from clients. (Note: server sponsored modification will be
285  * allowed for flush pending modifications)
286  *
287  * Secondly, it will flush all pending modification via dt_sync(), such as
288  * async OST-object destroy, async OST-object owner changes, and so on.
289  *
290  * If there are some on-handling clients sponsored modifications during the
291  * barrier creating, then related modifications may cause pending requests
292  * after the first dt_sync(), so call dt_sync() again after all on-handling
293  * modifications done.
294  *
295  * With the phase1 barrier set, all pending cross-servers modification RPCs
296  * have been flushed to remote servers, and any new modification will be
297  * blocked. But it does not guarantees that all the updates have been
298  * committed to storage on remote servers. So when all the instances have
299  * done phase1 barrier successfully, the MGS will notify all instances to
300  * do the phase2 barrier as following:
301  *
302  * Every barrier instance will call dt_sync() to make all async transactions
303  * to be committed locally.
304  *
305  * \param[in] env       pointer to the thread context
306  * \param[in] mgs       pointer to the MGS device
307  * \param[in] bc        pointer the barrier control structure
308  *
309  * \retval              0 for success
310  * \retval              negative error number on failure
311  */
312 static int mgs_barrier_freeze(const struct lu_env *env,
313                               struct mgs_device *mgs,
314                               struct barrier_ctl *bc)
315 {
316         char *name = mgs_env_info(env)->mgi_fsname;
317         struct fs_db *fsdb;
318         int rc = 0;
319         time64_t left;
320         bool phase1 = true;
321         bool dirty = false;
322         ENTRY;
323
324         snprintf(name, sizeof(mgs_env_info(env)->mgi_fsname) - 1, "%s-%s",
325                  bc->bc_name, BARRIER_FILENAME);
326
327         down_write(&mgs->mgs_barrier_rwsem);
328         mutex_lock(&mgs->mgs_mutex);
329
330         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, &fsdb);
331         if (rc) {
332                 mutex_unlock(&mgs->mgs_mutex);
333                 up_write(&mgs->mgs_barrier_rwsem);
334                 RETURN(rc);
335         }
336
337         if (unlikely(fsdb->fsdb_mdt_count == 0)) {
338                 mgs_barrier_bitmap_setup(mgs, fsdb, bc->bc_name);
339
340                 /* fsdb was just created, ensure that fsdb_barrier_disabled is
341                  * set correctly */
342                 if (fsdb->fsdb_mdt_count > 0) {
343                         struct obd_export *exp;
344                         struct obd_device *mgs_obd = mgs->mgs_obd;
345
346                         spin_lock(&mgs_obd->obd_dev_lock);
347                         list_for_each_entry(exp, &mgs_obd->obd_exports,
348                                             exp_obd_chain) {
349                                 if (mgs_barrier_tests_disabled(bc, exp, fsdb))
350                                         break;
351                         }
352                         spin_unlock(&mgs_obd->obd_dev_lock);
353                 }
354         }
355
356         mutex_lock(&fsdb->fsdb_mutex);
357         mutex_unlock(&mgs->mgs_mutex);
358
359         switch (fsdb->fsdb_barrier_status) {
360         case BS_THAWING:
361         case BS_RESCAN:
362                 rc = -EBUSY;
363                 break;
364         case BS_FREEZING_P1:
365         case BS_FREEZING_P2:
366                 rc = -EINPROGRESS;
367                 break;
368         case BS_FROZEN:
369                 if (mgs_barrier_expired(fsdb, fsdb->fsdb_barrier_timeout)) {
370                         rc = -EALREADY;
371                         break;
372                 }
373                 /* fallthrough */
374         case BS_INIT:
375         case BS_THAWED:
376         case BS_EXPIRED:
377         case BS_FAILED:
378                 if (fsdb->fsdb_barrier_disabled) {
379                         rc = -EOPNOTSUPP;
380                 } else if (unlikely(fsdb->fsdb_mdt_count == 0)) {
381                         rc = -ENODEV;
382                 } else {
383                         fsdb->fsdb_barrier_latest_create_time =
384                                 ktime_get_real_seconds();
385                         fsdb->fsdb_barrier_status = BS_FREEZING_P1;
386                         if (bc->bc_timeout != 0)
387                                 fsdb->fsdb_barrier_timeout = bc->bc_timeout;
388                         else
389                                 fsdb->fsdb_barrier_timeout =
390                                                 BARRIER_TIMEOUT_DEFAULT;
391                         memset(fsdb->fsdb_barrier_map, 0, INDEX_MAP_SIZE);
392                 }
393                 break;
394         default:
395                 LCONSOLE_WARN("%s: found unexpected barrier status %u\n",
396                               bc->bc_name, fsdb->fsdb_barrier_status);
397                 rc = -EINVAL;
398                 LBUG();
399         }
400
401         if (rc)
402                 GOTO(out, rc);
403
404         left = fsdb->fsdb_barrier_timeout;
405
406 again:
407         mutex_unlock(&fsdb->fsdb_mutex);
408         up_write(&mgs->mgs_barrier_rwsem);
409
410         CFS_FAIL_TIMEOUT(OBD_FAIL_BARRIER_DELAY, cfs_fail_val);
411
412         rc = mgs_barrier_glimpse_lock(env, mgs, fsdb, left,
413                                       phase1 ? BS_FREEZING_P1 : BS_FROZEN);
414         down_write(&mgs->mgs_barrier_rwsem);
415         mutex_lock(&fsdb->fsdb_mutex);
416
417         dirty = true;
418         left = fsdb->fsdb_barrier_latest_create_time +
419                fsdb->fsdb_barrier_timeout - ktime_get_real_seconds();
420         if (left <= 0) {
421                 fsdb->fsdb_barrier_status = BS_EXPIRED;
422
423                 GOTO(out, rc = -ETIME);
424         }
425
426         LASSERTF(fsdb->fsdb_barrier_status ==
427                  (phase1 ? BS_FREEZING_P1 : BS_FREEZING_P2),
428                  "unexpected barrier status %u\n",
429                  fsdb->fsdb_barrier_status);
430
431         if (rc == -ETIMEDOUT) {
432                 fsdb->fsdb_barrier_status = BS_EXPIRED;
433                 rc = -ETIME;
434         } else if (rc > 0) {
435                 fsdb->fsdb_barrier_status = rc;
436                 rc = -EREMOTE;
437         } else if (rc < 0) {
438                 fsdb->fsdb_barrier_status = BS_FAILED;
439         } else if (mgs_barrier_done(fsdb)) {
440                 if (phase1) {
441                         fsdb->fsdb_barrier_status = BS_FREEZING_P2;
442                         memset(fsdb->fsdb_barrier_map, 0,
443                                INDEX_MAP_SIZE);
444                         phase1 = false;
445
446                         goto again;
447                 } else {
448                         fsdb->fsdb_barrier_status = BS_FROZEN;
449                 }
450         } else {
451                 fsdb->fsdb_barrier_status = BS_FAILED;
452                 rc = -EREMOTE;
453         }
454
455         GOTO(out, rc);
456
457 out:
458         mutex_unlock(&fsdb->fsdb_mutex);
459         up_write(&mgs->mgs_barrier_rwsem);
460         if (rc && dirty) {
461                 memset(fsdb->fsdb_barrier_map, 0, INDEX_MAP_SIZE);
462                 mgs_barrier_glimpse_lock(env, mgs, fsdb, 0, BS_THAWED);
463         }
464
465         mgs_put_fsdb(mgs, fsdb);
466
467         return rc;
468 }
469
470 static int mgs_barrier_thaw(const struct lu_env *env,
471                             struct mgs_device *mgs,
472                             struct barrier_ctl *bc)
473 {
474         char *name = mgs_env_info(env)->mgi_fsname;
475         struct fs_db *fsdb;
476         int rc = 0;
477         ENTRY;
478
479         snprintf(name, sizeof(mgs_env_info(env)->mgi_fsname) - 1, "%s-%s",
480                  bc->bc_name, BARRIER_FILENAME);
481
482         down_write(&mgs->mgs_barrier_rwsem);
483         mutex_lock(&mgs->mgs_mutex);
484
485         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, &fsdb);
486         if (rc) {
487                 mutex_unlock(&mgs->mgs_mutex);
488                 up_write(&mgs->mgs_barrier_rwsem);
489                 RETURN(rc);
490         }
491
492         if (unlikely(fsdb->fsdb_mdt_count == 0)) {
493                 mgs_barrier_bitmap_setup(mgs, fsdb, bc->bc_name);
494
495                 /* fsdb was just created, ensure that fsdb_barrier_disabled is
496                  * set correctly */
497                 if (fsdb->fsdb_mdt_count > 0) {
498                         struct obd_export *exp;
499                         struct obd_device *mgs_obd = mgs->mgs_obd;
500
501                         spin_lock(&mgs_obd->obd_dev_lock);
502                         list_for_each_entry(exp, &mgs_obd->obd_exports,
503                                             exp_obd_chain) {
504                                 if (mgs_barrier_tests_disabled(bc, exp, fsdb))
505                                         break;
506                         }
507                         spin_unlock(&mgs_obd->obd_dev_lock);
508                 }
509         }
510
511         mutex_lock(&fsdb->fsdb_mutex);
512         mutex_unlock(&mgs->mgs_mutex);
513
514         switch (fsdb->fsdb_barrier_status) {
515         case BS_FREEZING_P1:
516         case BS_FREEZING_P2:
517         case BS_RESCAN:
518                 rc = -EBUSY;
519                 break;
520         case BS_INIT:
521         case BS_THAWED:
522                 rc = -EALREADY;
523                 break;
524         case BS_THAWING:
525                 rc = -EINPROGRESS;
526                 break;
527         case BS_FROZEN:
528         case BS_EXPIRED: /* The barrier on some MDT(s) may be expired,
529                           * but may be not on others. Destory anyway. */
530         case BS_FAILED:
531                 if (unlikely(fsdb->fsdb_mdt_count == 0)) {
532                         rc = -ENODEV;
533                 } else {
534                         fsdb->fsdb_barrier_status = BS_THAWING;
535                         memset(fsdb->fsdb_barrier_map, 0, INDEX_MAP_SIZE);
536                 }
537                 break;
538         default:
539                 LCONSOLE_WARN("%s: found unexpected barrier status %u\n",
540                               bc->bc_name, fsdb->fsdb_barrier_status);
541                 rc = -EINVAL;
542                 LBUG();
543         }
544
545         if (rc)
546                 GOTO(out, rc);
547
548         mutex_unlock(&fsdb->fsdb_mutex);
549         up_write(&mgs->mgs_barrier_rwsem);
550
551         CFS_FAIL_TIMEOUT(OBD_FAIL_BARRIER_DELAY, cfs_fail_val);
552
553         rc = mgs_barrier_glimpse_lock(env, mgs, fsdb, 0, BS_THAWED);
554         down_write(&mgs->mgs_barrier_rwsem);
555         mutex_lock(&fsdb->fsdb_mutex);
556
557         LASSERTF(fsdb->fsdb_barrier_status == BS_THAWING,
558                  "unexpected barrier status %u\n",
559                  fsdb->fsdb_barrier_status);
560
561         if (rc > 0) {
562                 fsdb->fsdb_barrier_status = rc;
563                 rc = -EREMOTE;
564         } else if (rc < 0) {
565                 fsdb->fsdb_barrier_status = BS_FAILED;
566         } else if (mgs_barrier_done(fsdb)) {
567                 fsdb->fsdb_barrier_status = BS_THAWED;
568         } else {
569                 fsdb->fsdb_barrier_status = BS_FAILED;
570                 rc = -EREMOTE;
571         }
572
573         GOTO(out, rc);
574
575 out:
576         mutex_unlock(&fsdb->fsdb_mutex);
577         up_write(&mgs->mgs_barrier_rwsem);
578         mgs_put_fsdb(mgs, fsdb);
579
580         return rc;
581 }
582
583 static int mgs_barrier_stat(const struct lu_env *env,
584                             struct mgs_device *mgs,
585                             struct barrier_ctl *bc)
586 {
587         char *name = mgs_env_info(env)->mgi_fsname;
588         struct fs_db *fsdb;
589         ENTRY;
590
591         snprintf(name, sizeof(mgs_env_info(env)->mgi_fsname) - 1, "%s-%s",
592                  bc->bc_name, BARRIER_FILENAME);
593
594         mutex_lock(&mgs->mgs_mutex);
595
596         fsdb = mgs_find_fsdb(mgs, name);
597         if (fsdb) {
598                 mutex_lock(&fsdb->fsdb_mutex);
599                 mutex_unlock(&mgs->mgs_mutex);
600
601                 bc->bc_status = fsdb->fsdb_barrier_status;
602                 if (bc->bc_status == BS_FREEZING_P1 ||
603                     bc->bc_status == BS_FREEZING_P2 ||
604                     bc->bc_status == BS_FROZEN) {
605                         if (mgs_barrier_expired(fsdb, fsdb->fsdb_barrier_timeout))
606                                 bc->bc_timeout =
607                                         fsdb->fsdb_barrier_latest_create_time +
608                                         fsdb->fsdb_barrier_timeout -
609                                         ktime_get_real_seconds();
610                         else
611                                 bc->bc_status = fsdb->fsdb_barrier_status =
612                                         BS_EXPIRED;
613                 }
614
615                 mutex_unlock(&fsdb->fsdb_mutex);
616                 mgs_put_fsdb(mgs, fsdb);
617         } else {
618                 mutex_unlock(&mgs->mgs_mutex);
619
620                 bc->bc_status = BS_INIT;
621         }
622
623         RETURN(0);
624 }
625
626 static int mgs_barrier_rescan(const struct lu_env *env,
627                               struct mgs_device *mgs,
628                               struct barrier_ctl *bc)
629 {
630         char *name = mgs_env_info(env)->mgi_fsname;
631         struct fs_db *b_fsdb;
632         struct fs_db *c_fsdb;
633         int rc = 0;
634         ENTRY;
635
636         down_write(&mgs->mgs_barrier_rwsem);
637         mutex_lock(&mgs->mgs_mutex);
638
639         c_fsdb = mgs_find_fsdb(mgs, bc->bc_name);
640         if (!c_fsdb || unlikely(c_fsdb->fsdb_mdt_count == 0)) {
641                 mutex_unlock(&mgs->mgs_mutex);
642                 up_write(&mgs->mgs_barrier_rwsem);
643
644                 RETURN(-ENODEV);
645         }
646
647         snprintf(name, sizeof(mgs_env_info(env)->mgi_fsname) - 1, "%s-%s",
648                  bc->bc_name, BARRIER_FILENAME);
649         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, &b_fsdb);
650         if (rc) {
651                 mutex_unlock(&mgs->mgs_mutex);
652                 up_write(&mgs->mgs_barrier_rwsem);
653                 mgs_put_fsdb(mgs, c_fsdb);
654                 RETURN(rc);
655         }
656
657         if (unlikely(b_fsdb->fsdb_mdt_count == 0 &&
658                      c_fsdb->fsdb_mdt_count > 0)) {
659                 /* fsdb was just created, ensure that fsdb_barrier_disabled is
660                  * set correctly */
661                 struct obd_export *exp;
662                 struct obd_device *mgs_obd = mgs->mgs_obd;
663
664                 spin_lock(&mgs_obd->obd_dev_lock);
665                 list_for_each_entry(exp, &mgs_obd->obd_exports,
666                                     exp_obd_chain) {
667                         if (mgs_barrier_tests_disabled(bc, exp, b_fsdb))
668                                 break;
669                 }
670                 spin_unlock(&mgs_obd->obd_dev_lock);
671         }
672
673         mutex_lock(&b_fsdb->fsdb_mutex);
674         mutex_lock(&c_fsdb->fsdb_mutex);
675         mutex_unlock(&mgs->mgs_mutex);
676
677         switch (b_fsdb->fsdb_barrier_status) {
678         case BS_RESCAN:
679                 rc = -EINPROGRESS;
680                 break;
681         case BS_THAWING:
682         case BS_FREEZING_P1:
683         case BS_FREEZING_P2:
684                 rc = -EBUSY;
685                 break;
686         case BS_FROZEN:
687                 if (mgs_barrier_expired(b_fsdb, b_fsdb->fsdb_barrier_timeout)) {
688                         rc = -EBUSY;
689                         break;
690                 }
691                 /* fallthrough */
692         case BS_INIT:
693         case BS_THAWED:
694         case BS_EXPIRED:
695         case BS_FAILED:
696                 b_fsdb->fsdb_barrier_latest_create_time = ktime_get_real_seconds();
697                 b_fsdb->fsdb_barrier_status = BS_RESCAN;
698                 memcpy(b_fsdb->fsdb_mdt_index_map, c_fsdb->fsdb_mdt_index_map,
699                        INDEX_MAP_SIZE);
700                 memset(b_fsdb->fsdb_barrier_map, 0, INDEX_MAP_SIZE);
701                 b_fsdb->fsdb_mdt_count = c_fsdb->fsdb_mdt_count;
702                 break;
703         default:
704                 LCONSOLE_WARN("%s: found unexpected barrier status %u\n",
705                               bc->bc_name, b_fsdb->fsdb_barrier_status);
706                 rc = -EINVAL;
707                 LBUG();
708         }
709
710         mutex_unlock(&c_fsdb->fsdb_mutex);
711         mgs_put_fsdb(mgs, c_fsdb);
712
713         if (rc)
714                 GOTO(out, rc);
715
716 again:
717         mutex_unlock(&b_fsdb->fsdb_mutex);
718         up_write(&mgs->mgs_barrier_rwsem);
719         rc = mgs_barrier_glimpse_lock(env, mgs, b_fsdb, 0, BS_INIT);
720         down_write(&mgs->mgs_barrier_rwsem);
721         mutex_lock(&b_fsdb->fsdb_mutex);
722
723         LASSERTF(b_fsdb->fsdb_barrier_status == BS_RESCAN,
724                  "unexpected barrier status %u\n",
725                  b_fsdb->fsdb_barrier_status);
726
727         if (rc > 0) {
728                 b_fsdb->fsdb_barrier_status = rc;
729                 rc = -EREMOTE;
730         } else if (rc == -ETIMEDOUT &&
731                    mgs_barrier_expired(b_fsdb, bc->bc_timeout)) {
732                 memset(b_fsdb->fsdb_barrier_map, 0, INDEX_MAP_SIZE);
733
734                 goto again;
735         } else if (rc < 0 && rc != -ETIMEDOUT && rc != -ENODEV) {
736                 b_fsdb->fsdb_barrier_status = BS_FAILED;
737         } else {
738                 int i;
739
740                 b_fsdb->fsdb_mdt_count = 0;
741                 bc->bc_total = 0;
742                 bc->bc_absence = 0;
743                 rc = 0;
744                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
745                         if (test_bit(i, b_fsdb->fsdb_barrier_map)) {
746                                 b_fsdb->fsdb_mdt_count++;
747                         } else if (test_bit(i, b_fsdb->fsdb_mdt_index_map)) {
748                                 b_fsdb->fsdb_mdt_count++;
749                                 bc->bc_absence++;
750                         }
751                 }
752
753                 bc->bc_total = b_fsdb->fsdb_mdt_count;
754                 memcpy(b_fsdb->fsdb_mdt_index_map,
755                        b_fsdb->fsdb_barrier_map, INDEX_MAP_SIZE);
756                 b_fsdb->fsdb_barrier_status = BS_INIT;
757         }
758
759         GOTO(out, rc);
760
761 out:
762         mutex_unlock(&b_fsdb->fsdb_mutex);
763         up_write(&mgs->mgs_barrier_rwsem);
764         mgs_put_fsdb(mgs, b_fsdb);
765
766         return rc;
767 }
768
769 int mgs_iocontrol_barrier(const struct lu_env *env,
770                           struct mgs_device *mgs,
771                           struct obd_ioctl_data *data)
772 {
773         struct barrier_ctl *bc = (struct barrier_ctl *)(data->ioc_inlbuf1);
774         int rc;
775         ENTRY;
776
777         if (unlikely(bc->bc_version != BARRIER_VERSION_V1))
778                 RETURN(-EOPNOTSUPP);
779
780         if (unlikely(bc->bc_name[0] == '\0' ||
781                      strnlen(bc->bc_name, sizeof(bc->bc_name)) > 8))
782                 RETURN(-EINVAL);
783
784         /* NOT allow barrier operations during recovery. */
785         if (unlikely(mgs->mgs_obd->obd_recovering))
786                 RETURN(-EBUSY);
787
788         switch (bc->bc_cmd) {
789         case BC_FREEZE:
790                 rc = mgs_barrier_freeze(env, mgs, bc);
791                 break;
792         case BC_THAW:
793                 rc = mgs_barrier_thaw(env, mgs, bc);
794                 break;
795         case BC_STAT:
796                 rc = mgs_barrier_stat(env, mgs, bc);
797                 break;
798         case BC_RESCAN:
799                 rc = mgs_barrier_rescan(env, mgs, bc);
800                 break;
801         default:
802                 rc = -EINVAL;
803                 break;
804         }
805
806         RETURN(rc);
807 }