Whamcloud - gitweb
LU-14352 various: only use wake_up_all() on exclusive waitqs
[fs/lustre-release.git] / lustre / target / barrier.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  *
25  * lustre/target/barrier.c
26  *
27  * Currently, the Lustre barrier is implemented as write barrier on all MDTs.
28  * For each MDT in the system, when it starts, it registers a barrier instance
29  * that will be used in handling subsequent barrier requests.
30  *
31  * Author: Fan, Yong <fan.yong@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_SNAPSHOT
35
36 #include <linux/percpu_counter.h>
37
38 #include <dt_object.h>
39 #include <obd.h>
40 #include <obd_class.h>
41 #include <lustre_barrier.h>
42 #include <uapi/linux/lustre/lustre_barrier_user.h>
43
44 static LIST_HEAD(barrier_instance_list);
45 static DEFINE_SPINLOCK(barrier_instance_lock);
46
47 struct barrier_instance {
48         struct list_head         bi_link;
49         struct dt_device        *bi_bottom;
50         struct dt_device        *bi_next;
51         wait_queue_head_t        bi_waitq;
52         rwlock_t                 bi_rwlock;
53         struct percpu_counter    bi_writers;
54         atomic_t                 bi_ref;
55         time64_t                 bi_deadline;
56         __u32                    bi_status;
57 };
58
59 static inline char *barrier_barrier2name(struct barrier_instance *barrier)
60 {
61         return barrier->bi_bottom->dd_lu_dev.ld_obd->obd_name;
62 }
63
64 static inline __u32 barrier_dev_idx(struct barrier_instance *barrier)
65 {
66         return lu_site2seq(barrier->bi_bottom->dd_lu_dev.ld_site)->ss_node_id;
67 }
68
69 static void barrier_instance_cleanup(struct barrier_instance *barrier)
70 {
71         LASSERT(list_empty(&barrier->bi_link));
72
73         percpu_counter_destroy(&barrier->bi_writers);
74         OBD_FREE_PTR(barrier);
75 }
76
77 static inline void barrier_instance_put(struct barrier_instance *barrier)
78 {
79         if (atomic_dec_and_test(&barrier->bi_ref))
80                 barrier_instance_cleanup(barrier);
81 }
82
83 static struct barrier_instance *
84 barrier_instance_find_locked(struct dt_device *key)
85 {
86         struct barrier_instance *barrier;
87
88         list_for_each_entry(barrier, &barrier_instance_list, bi_link) {
89                 if (barrier->bi_bottom == key)
90                         return barrier;
91         }
92
93         return NULL;
94 }
95
96 static void barrier_instance_add(struct barrier_instance *barrier)
97 {
98         struct barrier_instance *tmp;
99
100         spin_lock(&barrier_instance_lock);
101         tmp = barrier_instance_find_locked(barrier->bi_bottom);
102         LASSERT(!tmp);
103
104         list_add_tail(&barrier->bi_link, &barrier_instance_list);
105         spin_unlock(&barrier_instance_lock);
106 }
107
108 static struct barrier_instance *barrier_instance_find(struct dt_device *key)
109 {
110         struct barrier_instance *barrier;
111
112         spin_lock(&barrier_instance_lock);
113         barrier = barrier_instance_find_locked(key);
114         if (barrier)
115                 atomic_inc(&barrier->bi_ref);
116         spin_unlock(&barrier_instance_lock);
117
118         return barrier;
119 }
120
121 static void barrier_set(struct barrier_instance *barrier, __u32 status)
122 {
123         if (barrier->bi_status != status) {
124                 CDEBUG(D_SNAPSHOT, "%s: change barrier status from %u to %u\n",
125                        barrier_barrier2name(barrier),
126                        barrier->bi_status, status);
127
128                 barrier->bi_status = status;
129         }
130 }
131
132 /**
133  * Create the barrier for the given instance.
134  *
135  * We use two-phases barrier to guarantee that after the barrier setup:
136  * 1) All the MDT side pending async modification have been flushed.
137  * 2) Any subsequent modification will be blocked.
138  * 3) All async transactions on the MDTs have been committed.
139  *
140  * For phase1, we do the following:
141  *
142  * Firstly, it sets barrier flag on the instance that will block subsequent
143  * modifications from clients. (Note: server sponsored modification will be
144  * allowed for flush pending modifications)
145  *
146  * Secondly, it will flush all pending modification via dt_sync(), such as
147  * async OST-object destroy, async OST-object owner changes, and so on.
148  *
149  * If there are some on-handling clients sponsored modifications during the
150  * barrier freezing, then related modifications may cause pending requests
151  * after the first dt_sync(), so call dt_sync() again after all on-handling
152  * modifications done.
153  *
154  * With the phase1 barrier set, all pending cross-servers modification have
155  * been flushed to remote servers, and any new modification will be blocked.
156  * But it does not guarantees that all the updates have been committed to
157  * storage on remote servers. So when all the instances have done phase1
158  * barrier successfully, the MGS will notify all instances to do the phase2
159  * barrier as following:
160  *
161  * Every barrier instance will call dt_sync() to make all async transactions
162  * to be committed locally.
163  *
164  * \param[in] env       pointer to the thread context
165  * \param[in] barrier   pointer to the barrier instance
166  * \param[in] phase1    indicate whether it is phase1 barrier or not
167  *
168  * \retval              positive number for timeout
169  * \retval              0 for success
170  * \retval              negative error number on failure
171  */
172 static int barrier_freeze(const struct lu_env *env,
173                           struct barrier_instance *barrier, bool phase1)
174 {
175         time64_t left;
176         int rc = 0;
177         __s64 inflight = 0;
178         ENTRY;
179
180         write_lock(&barrier->bi_rwlock);
181         barrier_set(barrier, phase1 ? BS_FREEZING_P1 : BS_FREEZING_P2);
182
183         /* Avoid out-of-order execution the barrier_set()
184          * and the check of inflight modifications count. */
185         smp_mb();
186
187         if (phase1)
188                 inflight = percpu_counter_sum(&barrier->bi_writers);
189         write_unlock(&barrier->bi_rwlock);
190
191         rc = dt_sync(env, barrier->bi_next);
192         if (rc)
193                 RETURN(rc);
194
195         LASSERT(barrier->bi_deadline != 0);
196
197         left = barrier->bi_deadline - ktime_get_real_seconds();
198         if (left <= 0)
199                 RETURN(1);
200
201         if (phase1 && inflight != 0) {
202                 rc = wait_event_idle_timeout(
203                         barrier->bi_waitq,
204                         percpu_counter_sum(&barrier->bi_writers) == 0,
205                         cfs_time_seconds(left));
206                 if (rc <= 0)
207                         RETURN(1);
208
209                 /* sync again after all inflight modifications done. */
210                 rc = dt_sync(env, barrier->bi_next);
211                 if (rc)
212                         RETURN(rc);
213
214                 if (ktime_get_real_seconds() > barrier->bi_deadline)
215                         RETURN(1);
216         }
217
218         CDEBUG(D_SNAPSHOT, "%s: barrier freezing %s done.\n",
219                barrier_barrier2name(barrier), phase1 ? "phase1" : "phase2");
220
221         if (!phase1)
222                 barrier_set(barrier, BS_FROZEN);
223
224         RETURN(0);
225 }
226
227 void barrier_init(void)
228 {
229 }
230
231 void barrier_fini(void)
232 {
233         LASSERT(list_empty(&barrier_instance_list));
234 }
235
236 bool barrier_entry(struct dt_device *key)
237 {
238         struct barrier_instance *barrier;
239         bool entered = false;
240         ENTRY;
241
242         barrier = barrier_instance_find(key);
243         if (unlikely(!barrier))
244                 /* Fail open */
245                 RETURN(true);
246
247         read_lock(&barrier->bi_rwlock);
248         if (likely(barrier->bi_status != BS_FREEZING_P1 &&
249                    barrier->bi_status != BS_FREEZING_P2 &&
250                    barrier->bi_status != BS_FROZEN) ||
251             ktime_get_real_seconds() > barrier->bi_deadline) {
252                 percpu_counter_inc(&barrier->bi_writers);
253                 entered = true;
254         }
255         read_unlock(&barrier->bi_rwlock);
256
257         barrier_instance_put(barrier);
258         return entered;
259 }
260 EXPORT_SYMBOL(barrier_entry);
261
262 void barrier_exit(struct dt_device *key)
263 {
264         struct barrier_instance *barrier;
265
266         barrier = barrier_instance_find(key);
267         if (likely(barrier)) {
268                 percpu_counter_dec(&barrier->bi_writers);
269
270                 /* Avoid out-of-order execution the decreasing inflight
271                  * modifications count and the check of barrier status. */
272                 smp_mb();
273
274                 if (unlikely(barrier->bi_status == BS_FREEZING_P1))
275                         wake_up(&barrier->bi_waitq);
276                 barrier_instance_put(barrier);
277         }
278 }
279 EXPORT_SYMBOL(barrier_exit);
280
281 int barrier_handler(struct dt_device *key, struct ptlrpc_request *req)
282 {
283         struct ldlm_gl_barrier_desc *desc;
284         struct barrier_instance *barrier;
285         struct barrier_lvb *lvb;
286         struct lu_env env;
287         int rc = 0;
288         ENTRY;
289
290         /* glimpse on barrier locks always packs a glimpse descriptor */
291         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK_DESC);
292         desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
293         if (!desc)
294                 GOTO(out, rc = -EPROTO);
295
296         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
297                               sizeof(struct barrier_lvb));
298         rc = req_capsule_server_pack(&req->rq_pill);
299         if (rc)
300                 GOTO(out, rc);
301
302         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
303         barrier = barrier_instance_find(key);
304         if (!barrier)
305                 GOTO(out, rc = -ENODEV);
306
307         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
308         if (rc)
309                 GOTO(out_barrier, rc);
310
311         CDEBUG(D_SNAPSHOT,
312                "%s: handling barrier request: status %u, timeout %u\n",
313                barrier_barrier2name(barrier),
314                desc->lgbd_status, desc->lgbd_timeout);
315
316         switch (desc->lgbd_status) {
317         case BS_RESCAN:
318                 barrier_set(barrier, BS_INIT);
319                 break;
320         case BS_FREEZING_P1:
321         case BS_FREEZING_P2:
322                 if (OBD_FAIL_CHECK(OBD_FAIL_BARRIER_FAILURE))
323                         GOTO(fini, rc = -EINVAL);
324
325                 barrier->bi_deadline = ktime_get_real_seconds() +
326                                        desc->lgbd_timeout;
327                 rc = barrier_freeze(&env, barrier,
328                                     desc->lgbd_status == BS_FREEZING_P1);
329                 break;
330         case BS_THAWING:
331         case BS_FAILED:
332         case BS_EXPIRED:
333                 barrier_set(barrier, BS_THAWED);
334                 break;
335         default:
336                 CWARN("%s: unexpected barrier status %u\n",
337                       barrier_barrier2name(barrier), desc->lgbd_status);
338                 rc = -EINVAL;
339                 break;
340         }
341
342         GOTO(fini, rc);
343
344 fini:
345         lu_env_fini(&env);
346
347 out_barrier:
348         if (rc < 0)
349                 barrier_set(barrier, BS_FAILED);
350         else if (rc > 0)
351                 barrier_set(barrier, BS_EXPIRED);
352
353         lvb->lvb_status = barrier->bi_status;
354         lvb->lvb_index = barrier_dev_idx(barrier);
355
356         CDEBUG(D_SNAPSHOT, "%s: handled barrier request: status %u, "
357                "deadline %lld: rc = %d\n", barrier_barrier2name(barrier),
358                lvb->lvb_status, barrier->bi_deadline, rc);
359
360         barrier_instance_put(barrier);
361         rc = 0;
362
363 out:
364         req->rq_status = rc;
365         return rc;
366 }
367 EXPORT_SYMBOL(barrier_handler);
368
369 int barrier_register(struct dt_device *key, struct dt_device *next)
370 {
371         struct barrier_instance *barrier;
372         int rc;
373         ENTRY;
374
375         OBD_ALLOC_PTR(barrier);
376         if (!barrier)
377                 RETURN(-ENOMEM);
378
379         INIT_LIST_HEAD(&barrier->bi_link);
380         barrier->bi_bottom = key;
381         barrier->bi_next = next;
382         init_waitqueue_head(&barrier->bi_waitq);
383         rwlock_init(&barrier->bi_rwlock);
384         atomic_set(&barrier->bi_ref, 1);
385 #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
386         rc = percpu_counter_init(&barrier->bi_writers, 0, GFP_KERNEL);
387 #else
388         rc = percpu_counter_init(&barrier->bi_writers, 0);
389 #endif
390         if (rc)
391                 barrier_instance_cleanup(barrier);
392         else
393                 barrier_instance_add(barrier);
394
395         RETURN(rc);
396 }
397 EXPORT_SYMBOL(barrier_register);
398
399 void barrier_deregister(struct dt_device *key)
400 {
401         struct barrier_instance *barrier;
402
403         spin_lock(&barrier_instance_lock);
404         barrier = barrier_instance_find_locked(key);
405         if (barrier)
406                 list_del_init(&barrier->bi_link);
407         spin_unlock(&barrier_instance_lock);
408
409         if (barrier)
410                 barrier_instance_put(barrier);
411 }
412 EXPORT_SYMBOL(barrier_deregister);