Whamcloud - gitweb
7916d204305cfddc10e7e7a8182de7f2cc174232
[fs/lustre-release.git] / lustre / lov / lovsub_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOVSUB layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov
46  *  @{
47  */
48
49 /*****************************************************************************
50  *
51  * Lovsub lock operations.
52  *
53  */
54
55 static void lovsub_lock_fini(const struct lu_env *env,
56                              struct cl_lock_slice *slice)
57 {
58         struct lovsub_lock   *lsl;
59
60         ENTRY;
61         lsl = cl2lovsub_lock(slice);
62         LASSERT(list_empty(&lsl->lss_parents));
63         OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
64         EXIT;
65 }
66
67 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
68 {
69         struct cl_lock *parent;
70
71         ENTRY;
72         parent = lov->lls_cl.cls_lock;
73         cl_lock_get(parent);
74         lu_ref_add(&parent->cll_reference, "lovsub-parent", current);
75         cl_lock_mutex_get(env, parent);
76         EXIT;
77 }
78
79 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
80 {
81         struct cl_lock *parent;
82
83         ENTRY;
84         parent = lov->lls_cl.cls_lock;
85         cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
86         lu_ref_del(&parent->cll_reference, "lovsub-parent", current);
87         cl_lock_put(env, parent);
88         EXIT;
89 }
90
91 /**
92  * Implements cl_lock_operations::clo_state() method for lovsub layer, which
93  * method is called whenever sub-lock state changes. Propagates state change
94  * to the top-locks.
95  */
96 static void lovsub_lock_state(const struct lu_env *env,
97                               const struct cl_lock_slice *slice,
98                               enum cl_lock_state state)
99 {
100         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
101         struct lov_lock_link *scan;
102
103         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
104         ENTRY;
105
106         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
107                 struct lov_lock *lov    = scan->lll_super;
108                 struct cl_lock  *parent = lov->lls_cl.cls_lock;
109
110                 if (sub->lss_active != parent) {
111                         lovsub_parent_lock(env, lov);
112                         cl_lock_signal(env, parent);
113                         lovsub_parent_unlock(env, lov);
114                 }
115         }
116         EXIT;
117 }
118
119 /**
120  * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
121  * asking parent lock.
122  */
123 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
124                                        const struct cl_lock_slice *slice)
125 {
126         struct lovsub_lock *lock = cl2lovsub_lock(slice);
127         struct lov_lock    *lov;
128         unsigned long       dumbbell;
129
130         ENTRY;
131
132         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
133
134         if (!list_empty(&lock->lss_parents)) {
135                 /*
136                  * It is not clear whether all parents have to be asked and
137                  * their estimations summed, or it is enough to ask one. For
138                  * the current usages, one is always enough.
139                  */
140                 lov = container_of(lock->lss_parents.next,
141                                    struct lov_lock_link, lll_list)->lll_super;
142
143                 lovsub_parent_lock(env, lov);
144                 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
145                 lovsub_parent_unlock(env, lov);
146         } else
147                 dumbbell = 0;
148
149         RETURN(dumbbell);
150 }
151
152 /**
153  * Maps start/end offsets within a stripe, to offsets within a file.
154  */
155 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
156                                   struct lov_object *lov,
157                                   int stripe, struct cl_lock_descr *out)
158 {
159         pgoff_t size; /* stripe size in pages */
160         pgoff_t skip; /* how many pages in every stripe are occupied by
161                        * "other" stripes */
162         pgoff_t start;
163         pgoff_t end;
164
165         ENTRY;
166         start = in->cld_start;
167         end   = in->cld_end;
168
169         if (lov->lo_lsm->lsm_stripe_count > 1) {
170                 size = cl_index(lov2cl(lov), lov->lo_lsm->lsm_stripe_size);
171                 skip = (lov->lo_lsm->lsm_stripe_count - 1) * size;
172
173                 /* XXX overflow check here? */
174                 start += start/size * skip + stripe * size;
175
176                 if (end != CL_PAGE_EOF) {
177                         end += end/size * skip + stripe * size;
178                         /*
179                          * And check for overflow...
180                          */
181                         if (end < in->cld_end)
182                                 end = CL_PAGE_EOF;
183                 }
184         }
185         out->cld_start = start;
186         out->cld_end   = end;
187         EXIT;
188 }
189
190 /**
191  * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
192  * called in two ways:
193  *
194  *     - as part of receive call-back, when server returns granted extent to
195  *       the client, and
196  *
197  *     - when top-lock finds existing sub-lock in the cache.
198  *
199  * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
200  * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
201  */
202 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
203                        struct lovsub_lock *sublock,
204                        const struct cl_lock_descr *d, int idx)
205 {
206         struct cl_lock       *parent;
207         struct lovsub_object *subobj;
208         struct cl_lock_descr *pd;
209         struct cl_lock_descr *parent_descr;
210         int                   result;
211
212         parent       = lov->lls_cl.cls_lock;
213         parent_descr = &parent->cll_descr;
214         LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
215
216         subobj = cl2lovsub(sublock->lss_cl.cls_obj);
217         pd     = &lov_env_info(env)->lti_ldescr;
218
219         pd->cld_obj  = parent_descr->cld_obj;
220         pd->cld_mode = parent_descr->cld_mode;
221         pd->cld_gid  = parent_descr->cld_gid;
222         lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
223
224         /* LU-3027: only update extent of lock, plus the change in
225          * lovsub_lock_delete() that lock extent is modified after a sublock
226          * is canceled, we can make sure that the lock extent won't be updated
227          * any more. Therefore, lov_lock_fits_into() will always find feasible
228          * locks */
229         lov->lls_sub[idx].sub_got.cld_start = d->cld_start;
230         lov->lls_sub[idx].sub_got.cld_end = d->cld_end;
231         /*
232          * Notify top-lock about modification, if lock description changes
233          * materially.
234          */
235         if (!cl_lock_ext_match(parent_descr, pd))
236                 result = cl_lock_modify(env, parent, pd);
237         else
238                 result = 0;
239         return result;
240 }
241
242 static int lovsub_lock_modify(const struct lu_env *env,
243                               const struct cl_lock_slice *s,
244                               const struct cl_lock_descr *d)
245 {
246         struct lovsub_lock   *lock   = cl2lovsub_lock(s);
247         struct lov_lock_link *scan;
248         struct lov_lock      *lov;
249         int result                   = 0;
250
251         ENTRY;
252
253         LASSERT(cl_lock_mode_match(d->cld_mode,
254                                    s->cls_lock->cll_descr.cld_mode));
255         list_for_each_entry(scan, &lock->lss_parents, lll_list) {
256                 int rc;
257
258                 lov = scan->lll_super;
259                 lovsub_parent_lock(env, lov);
260                 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
261                 lovsub_parent_unlock(env, lov);
262                 result = result ?: rc;
263         }
264         RETURN(result);
265 }
266
267 static int lovsub_lock_closure(const struct lu_env *env,
268                                const struct cl_lock_slice *slice,
269                                struct cl_lock_closure *closure)
270 {
271         struct lovsub_lock   *sub;
272         struct cl_lock       *parent;
273         struct lov_lock_link *scan;
274         int                   result;
275
276         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
277         ENTRY;
278
279         sub    = cl2lovsub_lock(slice);
280         result = 0;
281
282         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
283                 parent = scan->lll_super->lls_cl.cls_lock;
284                 result = cl_lock_closure_build(env, parent, closure);
285                 if (result != 0)
286                         break;
287         }
288         RETURN(result);
289 }
290
291 /**
292  * A helper function for lovsub_lock_delete() that deals with a given parent
293  * top-lock.
294  */
295 static int lovsub_lock_delete_one(const struct lu_env *env,
296                                   struct cl_lock *child, struct lov_lock *lov)
297 {
298         struct cl_lock *parent;
299         int             result;
300         ENTRY;
301
302         parent = lov->lls_cl.cls_lock;
303         if (parent->cll_error)
304                 RETURN(0);
305
306         result = 0;
307         lov->lls_ever_canceled = 1;
308         switch (parent->cll_state) {
309         case CLS_ENQUEUED:
310                 /* See LU-1355 for the case that a glimpse lock is
311                  * interrupted by signal */
312                 LASSERT(parent->cll_flags & CLF_CANCELLED);
313                 break;
314         case CLS_QUEUING:
315         case CLS_FREEING:
316                 cl_lock_signal(env, parent);
317                 break;
318         case CLS_INTRANSIT:
319                 /*
320                  * Here lies a problem: a sub-lock is canceled while top-lock
321                  * is being unlocked. Top-lock cannot be moved into CLS_NEW
322                  * state, because unlocking has to succeed eventually by
323                  * placing lock into CLS_CACHED (or failing it), see
324                  * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
325                  * state, because lov maintains an invariant that all
326                  * sub-locks exist in CLS_CACHED (this allows cached top-lock
327                  * to be reused immediately). Nor can we wait for top-lock
328                  * state to change, because this can be synchronous to the
329                  * current thread.
330                  *
331                  * We know for sure that lov_lock_unuse() will be called at
332                  * least one more time to finish un-using, so leave a mark on
333                  * the top-lock, that will be seen by the next call to
334                  * lov_lock_unuse().
335                  */
336                 if (cl_lock_is_intransit(parent))
337                         lov->lls_cancel_race = 1;
338                 break;
339         case CLS_CACHED:
340                 /*
341                  * if a sub-lock is canceled move its top-lock into CLS_NEW
342                  * state to preserve an invariant that a top-lock in
343                  * CLS_CACHED is immediately ready for re-use (i.e., has all
344                  * sub-locks), and so that next attempt to re-use the top-lock
345                  * enqueues missing sub-lock.
346                  */
347                 cl_lock_state_set(env, parent, CLS_NEW);
348                 /* fall through */
349         case CLS_NEW:
350                 /*
351                  * if last sub-lock is canceled, destroy the top-lock (which
352                  * is now `empty') proactively.
353                  */
354                 if (lov->lls_nr_filled == 0) {
355                         /* ... but unfortunately, this cannot be done easily,
356                          * as cancellation of a top-lock might acquire mutices
357                          * of its other sub-locks, violating lock ordering,
358                          * see cl_lock_{cancel,delete}() preconditions.
359                          *
360                          * To work around this, the mutex of this sub-lock is
361                          * released, top-lock is destroyed, and sub-lock mutex
362                          * acquired again. The list of parents has to be
363                          * re-scanned from the beginning after this.
364                          *
365                          * Only do this if no mutices other than on @child and
366                          * @parent are held by the current thread.
367                          *
368                          * TODO: The lock modal here is too complex, because
369                          * the lock may be canceled and deleted by voluntarily:
370                          *    cl_lock_request
371                          *      -> osc_lock_enqueue_wait
372                          *        -> osc_lock_cancel_wait
373                          *          -> cl_lock_delete
374                          *            -> lovsub_lock_delete
375                          *              -> cl_lock_cancel/delete
376                          *                -> ...
377                          *
378                          * The better choice is to spawn a kernel thread for
379                          * this purpose. -jay
380                          */
381                         if (cl_lock_nr_mutexed(env) == 2) {
382                                 cl_lock_mutex_put(env, child);
383                                 cl_lock_cancel(env, parent);
384                                 cl_lock_delete(env, parent);
385                                 result = 1;
386                         }
387                 }
388                 break;
389         case CLS_HELD:
390                 CL_LOCK_DEBUG(D_ERROR, env, parent, "Delete CLS_HELD lock\n");
391                 /* falling through */
392         default:
393                 CERROR("Impossible state: %d\n", parent->cll_state);
394                 LBUG();
395                 break;
396         }
397
398         RETURN(result);
399 }
400
401 /**
402  * An implementation of cl_lock_operations::clo_delete() method. This is
403  * invoked in "bottom-to-top" delete, when lock destruction starts from the
404  * sub-lock (e.g, as a result of ldlm lock LRU policy).
405  */
406 static void lovsub_lock_delete(const struct lu_env *env,
407                                const struct cl_lock_slice *slice)
408 {
409         struct cl_lock     *child = slice->cls_lock;
410         struct lovsub_lock *sub   = cl2lovsub_lock(slice);
411         int restart;
412
413         LASSERT(cl_lock_is_mutexed(child));
414
415         ENTRY;
416         /*
417          * Destruction of a sub-lock might take multiple iterations, because
418          * when the last sub-lock of a given top-lock is deleted, top-lock is
419          * canceled proactively, and this requires to release sub-lock
420          * mutex. Once sub-lock mutex has been released, list of its parents
421          * has to be re-scanned from the beginning.
422          */
423         do {
424                 struct lov_lock      *lov;
425                 struct lov_lock_link *scan;
426                 struct lov_lock_link *temp;
427
428                 restart = 0;
429                 list_for_each_entry_safe(scan, temp,
430                                              &sub->lss_parents, lll_list) {
431                         lov     = scan->lll_super;
432                         lovsub_parent_lock(env, lov);
433                         lov_lock_unlink(env, scan, sub);
434                         restart = lovsub_lock_delete_one(env, child, lov);
435                         lovsub_parent_unlock(env, lov);
436
437                         if (restart) {
438                                 cl_lock_mutex_get(env, child);
439                                 break;
440                         }
441                }
442         } while (restart);
443         EXIT;
444 }
445
446 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
447                              lu_printer_t p, const struct cl_lock_slice *slice)
448 {
449         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
450         struct lov_lock      *lov;
451         struct lov_lock_link *scan;
452
453         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
454                 lov = scan->lll_super;
455                 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
456                 if (lov != NULL)
457                         cl_lock_descr_print(env, cookie, p,
458                                             &lov->lls_cl.cls_lock->cll_descr);
459                 (*p)(env, cookie, "] ");
460         }
461         return 0;
462 }
463
464 static const struct cl_lock_operations lovsub_lock_ops = {
465         .clo_fini    = lovsub_lock_fini,
466         .clo_state   = lovsub_lock_state,
467         .clo_delete  = lovsub_lock_delete,
468         .clo_modify  = lovsub_lock_modify,
469         .clo_closure = lovsub_lock_closure,
470         .clo_weigh   = lovsub_lock_weigh,
471         .clo_print   = lovsub_lock_print
472 };
473
474 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
475                      struct cl_lock *lock, const struct cl_io *io)
476 {
477         struct lovsub_lock *lsk;
478         int result;
479
480         ENTRY;
481         OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, GFP_NOFS);
482         if (lsk != NULL) {
483                 INIT_LIST_HEAD(&lsk->lss_parents);
484                 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
485                 result = 0;
486         } else
487                 result = -ENOMEM;
488         RETURN(result);
489 }
490
491 /** @} lov */