Whamcloud - gitweb
LU-12635 build: Support for gcc -Wimplicit-fallthrough
[fs/lustre-release.git] / lustre / target / barrier.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  *
25  * lustre/target/barrier.c
26  *
27  * Currently, the Lustre barrier is implemented as write barrier on all MDTs.
28  * For each MDT in the system, when it starts, it registers a barrier instance
29  * that will be used in handling subsequent barrier requests.
30  *
31  * Author: Fan, Yong <fan.yong@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_SNAPSHOT
35
36 #include <linux/percpu_counter.h>
37
38 #include <dt_object.h>
39 #include <obd.h>
40 #include <obd_class.h>
41 #include <lustre_barrier.h>
42 #include <uapi/linux/lustre/lustre_barrier_user.h>
43
44 static LIST_HEAD(barrier_instance_list);
45 static DEFINE_SPINLOCK(barrier_instance_lock);
46
47 struct barrier_instance {
48         struct list_head         bi_link;
49         struct dt_device        *bi_bottom;
50         struct dt_device        *bi_next;
51         wait_queue_head_t        bi_waitq;
52         rwlock_t                 bi_rwlock;
53         struct percpu_counter    bi_writers;
54         atomic_t                 bi_ref;
55         time64_t                 bi_deadline;
56         __u32                    bi_status;
57 };
58
59 static inline char *barrier_barrier2name(struct barrier_instance *barrier)
60 {
61         return barrier->bi_bottom->dd_lu_dev.ld_obd->obd_name;
62 }
63
64 static inline __u32 barrier_dev_idx(struct barrier_instance *barrier)
65 {
66         return lu_site2seq(barrier->bi_bottom->dd_lu_dev.ld_site)->ss_node_id;
67 }
68
69 static void barrier_instance_cleanup(struct barrier_instance *barrier)
70 {
71         LASSERT(list_empty(&barrier->bi_link));
72
73         percpu_counter_destroy(&barrier->bi_writers);
74         OBD_FREE_PTR(barrier);
75 }
76
77 static inline void barrier_instance_put(struct barrier_instance *barrier)
78 {
79         if (atomic_dec_and_test(&barrier->bi_ref))
80                 barrier_instance_cleanup(barrier);
81 }
82
83 static struct barrier_instance *
84 barrier_instance_find_locked(struct dt_device *key)
85 {
86         struct barrier_instance *barrier;
87
88         list_for_each_entry(barrier, &barrier_instance_list, bi_link) {
89                 if (barrier->bi_bottom == key)
90                         return barrier;
91         }
92
93         return NULL;
94 }
95
96 static void barrier_instance_add(struct barrier_instance *barrier)
97 {
98         struct barrier_instance *tmp;
99
100         spin_lock(&barrier_instance_lock);
101         tmp = barrier_instance_find_locked(barrier->bi_bottom);
102         LASSERT(!tmp);
103
104         list_add_tail(&barrier->bi_link, &barrier_instance_list);
105         spin_unlock(&barrier_instance_lock);
106 }
107
108 static struct barrier_instance *barrier_instance_find(struct dt_device *key)
109 {
110         struct barrier_instance *barrier;
111
112         spin_lock(&barrier_instance_lock);
113         barrier = barrier_instance_find_locked(key);
114         if (barrier)
115                 atomic_inc(&barrier->bi_ref);
116         spin_unlock(&barrier_instance_lock);
117
118         return barrier;
119 }
120
121 static void barrier_set(struct barrier_instance *barrier, __u32 status)
122 {
123         if (barrier->bi_status != status) {
124                 CDEBUG(D_SNAPSHOT, "%s: change barrier status from %u to %u\n",
125                        barrier_barrier2name(barrier),
126                        barrier->bi_status, status);
127
128                 barrier->bi_status = status;
129         }
130 }
131
132 /**
133  * Create the barrier for the given instance.
134  *
135  * We use two-phases barrier to guarantee that after the barrier setup:
136  * 1) All the MDT side pending async modification have been flushed.
137  * 2) Any subsequent modification will be blocked.
138  * 3) All async transactions on the MDTs have been committed.
139  *
140  * For phase1, we do the following:
141  *
142  * Firstly, it sets barrier flag on the instance that will block subsequent
143  * modifications from clients. (Note: server sponsored modification will be
144  * allowed for flush pending modifications)
145  *
146  * Secondly, it will flush all pending modification via dt_sync(), such as
147  * async OST-object destroy, async OST-object owner changes, and so on.
148  *
149  * If there are some on-handling clients sponsored modifications during the
150  * barrier freezing, then related modifications may cause pending requests
151  * after the first dt_sync(), so call dt_sync() again after all on-handling
152  * modifications done.
153  *
154  * With the phase1 barrier set, all pending cross-servers modification have
155  * been flushed to remote servers, and any new modification will be blocked.
156  * But it does not guarantees that all the updates have been committed to
157  * storage on remote servers. So when all the instances have done phase1
158  * barrier successfully, the MGS will notify all instances to do the phase2
159  * barrier as following:
160  *
161  * Every barrier instance will call dt_sync() to make all async transactions
162  * to be committed locally.
163  *
164  * \param[in] env       pointer to the thread context
165  * \param[in] barrier   pointer to the barrier instance
166  * \param[in] phase1    indicate whether it is phase1 barrier or not
167  *
168  * \retval              positive number for timeout
169  * \retval              0 for success
170  * \retval              negative error number on failure
171  */
172 static int barrier_freeze(const struct lu_env *env,
173                           struct barrier_instance *barrier, bool phase1)
174 {
175         time64_t left;
176         int rc = 0;
177         __s64 inflight = 0;
178         ENTRY;
179
180         write_lock(&barrier->bi_rwlock);
181         barrier_set(barrier, phase1 ? BS_FREEZING_P1 : BS_FREEZING_P2);
182
183         /* Avoid out-of-order execution the barrier_set()
184          * and the check of inflight modifications count. */
185         smp_mb();
186
187         if (phase1)
188                 inflight = percpu_counter_sum(&barrier->bi_writers);
189         write_unlock(&barrier->bi_rwlock);
190
191         rc = dt_sync(env, barrier->bi_next);
192         if (rc)
193                 RETURN(rc);
194
195         LASSERT(barrier->bi_deadline != 0);
196
197         left = barrier->bi_deadline - ktime_get_real_seconds();
198         if (left <= 0)
199                 RETURN(1);
200
201         if (phase1 && inflight != 0) {
202                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(left),
203                                                      NULL, NULL);
204
205                 rc = l_wait_event(barrier->bi_waitq,
206                                   percpu_counter_sum(&barrier->bi_writers) == 0,
207                                   &lwi);
208                 if (rc)
209                         RETURN(1);
210
211                 /* sync again after all inflight modifications done. */
212                 rc = dt_sync(env, barrier->bi_next);
213                 if (rc)
214                         RETURN(rc);
215
216                 if (ktime_get_real_seconds() > barrier->bi_deadline)
217                         RETURN(1);
218         }
219
220         CDEBUG(D_SNAPSHOT, "%s: barrier freezing %s done.\n",
221                barrier_barrier2name(barrier), phase1 ? "phase1" : "phase2");
222
223         if (!phase1)
224                 barrier_set(barrier, BS_FROZEN);
225
226         RETURN(0);
227 }
228
229 void barrier_init(void)
230 {
231 }
232
233 void barrier_fini(void)
234 {
235         LASSERT(list_empty(&barrier_instance_list));
236 }
237
238 bool barrier_entry(struct dt_device *key)
239 {
240         struct barrier_instance *barrier;
241         bool entered = false;
242         ENTRY;
243
244         barrier = barrier_instance_find(key);
245         if (unlikely(!barrier))
246                 /* Fail open */
247                 RETURN(true);
248
249         read_lock(&barrier->bi_rwlock);
250         if (likely(barrier->bi_status != BS_FREEZING_P1 &&
251                    barrier->bi_status != BS_FREEZING_P2 &&
252                    barrier->bi_status != BS_FROZEN) ||
253             ktime_get_real_seconds() > barrier->bi_deadline) {
254                 percpu_counter_inc(&barrier->bi_writers);
255                 entered = true;
256         }
257         read_unlock(&barrier->bi_rwlock);
258
259         barrier_instance_put(barrier);
260         return entered;
261 }
262 EXPORT_SYMBOL(barrier_entry);
263
264 void barrier_exit(struct dt_device *key)
265 {
266         struct barrier_instance *barrier;
267
268         barrier = barrier_instance_find(key);
269         if (likely(barrier)) {
270                 percpu_counter_dec(&barrier->bi_writers);
271
272                 /* Avoid out-of-order execution the decreasing inflight
273                  * modifications count and the check of barrier status. */
274                 smp_mb();
275
276                 if (unlikely(barrier->bi_status == BS_FREEZING_P1))
277                         wake_up_all(&barrier->bi_waitq);
278                 barrier_instance_put(barrier);
279         }
280 }
281 EXPORT_SYMBOL(barrier_exit);
282
283 int barrier_handler(struct dt_device *key, struct ptlrpc_request *req)
284 {
285         struct ldlm_gl_barrier_desc *desc;
286         struct barrier_instance *barrier;
287         struct barrier_lvb *lvb;
288         struct lu_env env;
289         int rc = 0;
290         ENTRY;
291
292         /* glimpse on barrier locks always packs a glimpse descriptor */
293         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK_DESC);
294         desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
295         if (!desc)
296                 GOTO(out, rc = -EPROTO);
297
298         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
299                               sizeof(struct barrier_lvb));
300         rc = req_capsule_server_pack(&req->rq_pill);
301         if (rc)
302                 GOTO(out, rc);
303
304         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
305         barrier = barrier_instance_find(key);
306         if (!barrier)
307                 GOTO(out, rc = -ENODEV);
308
309         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
310         if (rc)
311                 GOTO(out_barrier, rc);
312
313         CDEBUG(D_SNAPSHOT,
314                "%s: handling barrier request: status %u, timeout %u\n",
315                barrier_barrier2name(barrier),
316                desc->lgbd_status, desc->lgbd_timeout);
317
318         switch (desc->lgbd_status) {
319         case BS_RESCAN:
320                 barrier_set(barrier, BS_INIT);
321                 break;
322         case BS_FREEZING_P1:
323         case BS_FREEZING_P2:
324                 if (OBD_FAIL_CHECK(OBD_FAIL_BARRIER_FAILURE))
325                         GOTO(fini, rc = -EINVAL);
326
327                 barrier->bi_deadline = ktime_get_real_seconds() +
328                                        desc->lgbd_timeout;
329                 rc = barrier_freeze(&env, barrier,
330                                     desc->lgbd_status == BS_FREEZING_P1);
331                 break;
332         case BS_THAWING:
333         case BS_FAILED:
334         case BS_EXPIRED:
335                 barrier_set(barrier, BS_THAWED);
336                 break;
337         default:
338                 CWARN("%s: unexpected barrier status %u\n",
339                       barrier_barrier2name(barrier), desc->lgbd_status);
340                 rc = -EINVAL;
341                 break;
342         }
343
344         GOTO(fini, rc);
345
346 fini:
347         lu_env_fini(&env);
348
349 out_barrier:
350         if (rc < 0)
351                 barrier_set(barrier, BS_FAILED);
352         else if (rc > 0)
353                 barrier_set(barrier, BS_EXPIRED);
354
355         lvb->lvb_status = barrier->bi_status;
356         lvb->lvb_index = barrier_dev_idx(barrier);
357
358         CDEBUG(D_SNAPSHOT, "%s: handled barrier request: status %u, "
359                "deadline %lld: rc = %d\n", barrier_barrier2name(barrier),
360                lvb->lvb_status, barrier->bi_deadline, rc);
361
362         barrier_instance_put(barrier);
363         rc = 0;
364
365 out:
366         req->rq_status = rc;
367         return rc;
368 }
369 EXPORT_SYMBOL(barrier_handler);
370
371 int barrier_register(struct dt_device *key, struct dt_device *next)
372 {
373         struct barrier_instance *barrier;
374         int rc;
375         ENTRY;
376
377         OBD_ALLOC_PTR(barrier);
378         if (!barrier)
379                 RETURN(-ENOMEM);
380
381         INIT_LIST_HEAD(&barrier->bi_link);
382         barrier->bi_bottom = key;
383         barrier->bi_next = next;
384         init_waitqueue_head(&barrier->bi_waitq);
385         rwlock_init(&barrier->bi_rwlock);
386         atomic_set(&barrier->bi_ref, 1);
387 #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
388         rc = percpu_counter_init(&barrier->bi_writers, 0, GFP_KERNEL);
389 #else
390         rc = percpu_counter_init(&barrier->bi_writers, 0);
391 #endif
392         if (rc)
393                 barrier_instance_cleanup(barrier);
394         else
395                 barrier_instance_add(barrier);
396
397         RETURN(rc);
398 }
399 EXPORT_SYMBOL(barrier_register);
400
401 void barrier_deregister(struct dt_device *key)
402 {
403         struct barrier_instance *barrier;
404
405         spin_lock(&barrier_instance_lock);
406         barrier = barrier_instance_find_locked(key);
407         if (barrier)
408                 list_del_init(&barrier->bi_link);
409         spin_unlock(&barrier_instance_lock);
410
411         if (barrier)
412                 barrier_instance_put(barrier);
413 }
414 EXPORT_SYMBOL(barrier_deregister);