Whamcloud - gitweb
c97cb356b3b5e692348adccedb43bb69e491d9c5
[fs/lustre-release.git] / lustre / lov / lovsub_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOVSUB layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov
46  *  @{
47  */
48
49 /*****************************************************************************
50  *
51  * Lovsub lock operations.
52  *
53  */
54
55 static void lovsub_lock_fini(const struct lu_env *env,
56                              struct cl_lock_slice *slice)
57 {
58         struct lovsub_lock   *lsl;
59
60         ENTRY;
61         lsl = cl2lovsub_lock(slice);
62         LASSERT(list_empty(&lsl->lss_parents));
63         OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
64         EXIT;
65 }
66
67 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
68 {
69         struct cl_lock *parent;
70
71         ENTRY;
72         parent = lov->lls_cl.cls_lock;
73         cl_lock_get(parent);
74         lu_ref_add(&parent->cll_reference, "lovsub-parent", cfs_current());
75         cl_lock_mutex_get(env, parent);
76         EXIT;
77 }
78
79 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
80 {
81         struct cl_lock *parent;
82
83         ENTRY;
84         parent = lov->lls_cl.cls_lock;
85         cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
86         lu_ref_del(&parent->cll_reference, "lovsub-parent", cfs_current());
87         cl_lock_put(env, parent);
88         EXIT;
89 }
90
91 static int lovsub_lock_state_one(const struct lu_env *env,
92                                  const struct lovsub_lock *lovsub,
93                                  struct lov_lock *lov)
94 {
95         struct cl_lock *parent;
96         struct cl_lock *child;
97         int             restart = 0;
98
99         ENTRY;
100         parent = lov->lls_cl.cls_lock;
101         child  = lovsub->lss_cl.cls_lock;
102
103         if (lovsub->lss_active != parent) {
104                 lovsub_parent_lock(env, lov);
105                 if (child->cll_error != 0 && parent->cll_error == 0) {
106                         /*
107                          * This is a deadlock case:
108                          * cl_lock_error(for the parent lock)
109                          *   -> cl_lock_delete
110                          *     -> lov_lock_delete
111                          *       -> cl_lock_enclosure
112                          *         -> cl_lock_mutex_try(for the child lock)
113                          */
114                         cl_lock_mutex_put(env, child);
115                         cl_lock_error(env, parent, child->cll_error);
116                         restart = 1;
117                 } else {
118                         cl_lock_signal(env, parent);
119                 }
120                 lovsub_parent_unlock(env, lov);
121         }
122         RETURN(restart);
123 }
124
125 /**
126  * Implements cl_lock_operations::clo_state() method for lovsub layer, which
127  * method is called whenever sub-lock state changes. Propagates state change
128  * to the top-locks.
129  */
130 static void lovsub_lock_state(const struct lu_env *env,
131                               const struct cl_lock_slice *slice,
132                               enum cl_lock_state state)
133 {
134         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
135         struct lov_lock_link *scan;
136         int                   restart = 0;
137
138         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
139         ENTRY;
140
141         do {
142                 restart = 0;
143                 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
144                         restart = lovsub_lock_state_one(env, sub,
145                                                         scan->lll_super);
146                         if (restart) {
147                                 cl_lock_mutex_get(env, slice->cls_lock);
148                                 break;
149                         }
150                 }
151         } while(restart);
152         EXIT;
153 }
154
155 /**
156  * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
157  * asking parent lock.
158  */
159 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
160                                        const struct cl_lock_slice *slice)
161 {
162         struct lovsub_lock *lock = cl2lovsub_lock(slice);
163         struct lov_lock    *lov;
164         unsigned long       dumbbell;
165
166         ENTRY;
167
168         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
169
170         if (!list_empty(&lock->lss_parents)) {
171                 /*
172                  * It is not clear whether all parents have to be asked and
173                  * their estimations summed, or it is enough to ask one. For
174                  * the current usages, one is always enough.
175                  */
176                 lov = container_of(lock->lss_parents.next,
177                                    struct lov_lock_link, lll_list)->lll_super;
178
179                 lovsub_parent_lock(env, lov);
180                 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
181                 lovsub_parent_unlock(env, lov);
182         } else
183                 dumbbell = 0;
184
185         RETURN(dumbbell);
186 }
187
188 /**
189  * Maps start/end offsets within a stripe, to offsets within a file.
190  */
191 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
192                                   struct lov_object *obj,
193                                   int stripe, struct cl_lock_descr *out)
194 {
195         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
196         pgoff_t size; /* stripe size in pages */
197         pgoff_t skip; /* how many pages in every stripe are occupied by
198                        * "other" stripes */
199         pgoff_t start;
200         pgoff_t end;
201
202         ENTRY;
203         start = in->cld_start;
204         end   = in->cld_end;
205
206         /*
207          * XXX join file support.
208          */
209         if (lsm->lsm_stripe_count > 1) {
210                 size = cl_index(lov2cl(obj), lsm->lsm_stripe_size);
211                 skip = (lsm->lsm_stripe_count - 1) * size;
212
213                 /* XXX overflow check here? */
214                 start += start/size * skip + stripe * size;
215
216                 if (end != CL_PAGE_EOF) {
217                         end += end/size * skip + stripe * size;
218                         /*
219                          * And check for overflow...
220                          */
221                         if (end < in->cld_end)
222                                 end = CL_PAGE_EOF;
223                 }
224         }
225         out->cld_start = start;
226         out->cld_end   = end;
227         EXIT;
228 }
229
230 /**
231  * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
232  * called in two ways:
233  *
234  *     - as part of receive call-back, when server returns granted extent to
235  *       the client, and
236  *
237  *     - when top-lock finds existing sub-lock in the cache.
238  *
239  * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
240  * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
241  */
242 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
243                        struct lovsub_lock *sublock,
244                        const struct cl_lock_descr *d, int idx)
245 {
246         struct cl_lock       *parent;
247         struct cl_lock       *child;
248         struct lovsub_object *subobj;
249         struct cl_lock_descr *pd;
250         struct cl_lock_descr *parent_descr;
251         int                   result;
252
253         parent       = lov->lls_cl.cls_lock;
254         parent_descr = &parent->cll_descr;
255         LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
256
257         child  = sublock->lss_cl.cls_lock;
258         subobj = cl2lovsub(sublock->lss_cl.cls_obj);
259         pd     = &lov_env_info(env)->lti_ldescr;
260
261         pd->cld_obj  = parent_descr->cld_obj;
262         pd->cld_mode = parent_descr->cld_mode;
263         pd->cld_gid  = parent_descr->cld_gid;
264         lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
265         lov->lls_sub[idx].sub_got = *d;
266         /*
267          * Notify top-lock about modification, if lock description changes
268          * materially.
269          */
270         if (!cl_lock_ext_match(parent_descr, pd))
271                 result = cl_lock_modify(env, parent, pd);
272         else
273                 result = 0;
274         return result;
275 }
276
277 static int lovsub_lock_modify(const struct lu_env *env,
278                               const struct cl_lock_slice *s,
279                               const struct cl_lock_descr *d)
280 {
281         struct lovsub_lock   *lock   = cl2lovsub_lock(s);
282         struct lov_lock_link *scan;
283         struct lov_lock      *lov;
284         int result                   = 0;
285
286         ENTRY;
287
288         LASSERT(cl_lock_mode_match(d->cld_mode,
289                                    s->cls_lock->cll_descr.cld_mode));
290         list_for_each_entry(scan, &lock->lss_parents, lll_list) {
291                 int rc;
292
293                 lov = scan->lll_super;
294                 lovsub_parent_lock(env, lov);
295                 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
296                 lovsub_parent_unlock(env, lov);
297                 result = result ?: rc;
298         }
299         RETURN(result);
300 }
301
302 static int lovsub_lock_closure(const struct lu_env *env,
303                                const struct cl_lock_slice *slice,
304                                struct cl_lock_closure *closure)
305 {
306         struct lovsub_lock   *sub;
307         struct cl_lock       *parent;
308         struct lov_lock_link *scan;
309         int                   result;
310
311         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
312         ENTRY;
313
314         sub    = cl2lovsub_lock(slice);
315         result = 0;
316
317         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
318                 parent = scan->lll_super->lls_cl.cls_lock;
319                 result = cl_lock_closure_build(env, parent, closure);
320                 if (result != 0)
321                         break;
322         }
323         RETURN(result);
324 }
325
326 /**
327  * A helper function for lovsub_lock_delete() that deals with a given parent
328  * top-lock.
329  */
330 static int lovsub_lock_delete_one(const struct lu_env *env,
331                                   struct cl_lock *child, struct lov_lock *lov)
332 {
333         struct cl_lock       *parent;
334         int             result;
335         ENTRY;
336
337         parent  = lov->lls_cl.cls_lock;
338         result = 0;
339
340         switch (parent->cll_state) {
341         case CLS_NEW:
342         case CLS_QUEUING:
343         case CLS_ENQUEUED:
344         case CLS_FREEING:
345                 cl_lock_signal(env, parent);
346                 break;
347         case CLS_UNLOCKING:
348                 /*
349                  * Here lies a problem: a sub-lock is canceled while top-lock
350                  * is being unlocked. Top-lock cannot be moved into CLS_NEW
351                  * state, because unlocking has to succeed eventually by
352                  * placing lock into CLS_CACHED (or failing it), see
353                  * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
354                  * state, because lov maintains an invariant that all
355                  * sub-locks exist in CLS_CACHED (this allows cached top-lock
356                  * to be reused immediately). Nor can we wait for top-lock
357                  * state to change, because this can be synchronous to the
358                  * current thread.
359                          *
360                  * We know for sure that lov_lock_unuse() will be called at
361                  * least one more time to finish un-using, so leave a mark on
362                  * the top-lock, that will be seen by the next call to
363                  * lov_lock_unuse().
364                  */
365                 lov->lls_unuse_race = 1;
366                 break;
367         case CLS_CACHED:
368                 /*
369                  * if a sub-lock is canceled move its top-lock into CLS_NEW
370                  * state to preserve an invariant that a top-lock in
371                  * CLS_CACHED is immediately ready for re-use (i.e., has all
372                  * sub-locks), and so that next attempt to re-use the top-lock
373                  * enqueues missing sub-lock.
374                  */
375                 cl_lock_state_set(env, parent, CLS_NEW);
376                 /*
377                  * if last sub-lock is canceled, destroy the top-lock (which
378                  * is now `empty') proactively.
379                  */
380                 if (lov->lls_nr_filled == 0) {
381                         /* ... but unfortunately, this cannot be done easily,
382                          * as cancellation of a top-lock might acquire mutices
383                          * of its other sub-locks, violating lock ordering,
384                          * see cl_lock_{cancel,delete}() preconditions.
385                          *
386                          * To work around this, the mutex of this sub-lock is
387                          * released, top-lock is destroyed, and sub-lock mutex
388                          * acquired again. The list of parents has to be
389                          * re-scanned from the beginning after this.
390                          *
391                          * Only do this if no mutices other than on @child and
392                          * @parent are held by the current thread.
393                          *
394                          * TODO: The lock modal here is too complex, because
395                          * the lock may be canceled and deleted by voluntarily:
396                          *    cl_lock_request
397                          *      -> osc_lock_enqueue_wait
398                          *        -> osc_lock_cancel_wait
399                          *          -> cl_lock_delete
400                          *            -> lovsub_lock_delete
401                          *              -> cl_lock_cancel/delete
402                          *                -> ...
403                          *
404                          * The better choice is to spawn a kernel thread for
405                          * this purpose. -jay
406                          */
407                         if (cl_lock_nr_mutexed(env) == 2) {
408                                 cl_lock_mutex_put(env, child);
409                                 cl_lock_cancel(env, parent);
410                                 cl_lock_delete(env, parent);
411                                 result = 1;
412                         }
413                 }
414                 break;
415         case CLS_HELD:
416         default:
417                 CERROR("Impossible state: %i\n", parent->cll_state);
418                 LBUG();
419         }
420
421         RETURN(result);
422 }
423
424 /**
425  * An implementation of cl_lock_operations::clo_delete() method. This is
426  * invoked in "bottom-to-top" delete, when lock destruction starts from the
427  * sub-lock (e.g, as a result of ldlm lock LRU policy).
428  */
429 static void lovsub_lock_delete(const struct lu_env *env,
430                                const struct cl_lock_slice *slice)
431 {
432         struct cl_lock     *child = slice->cls_lock;
433         struct lovsub_lock *sub   = cl2lovsub_lock(slice);
434         int restart;
435
436         LASSERT(cl_lock_is_mutexed(child));
437
438         ENTRY;
439         /*
440          * Destruction of a sub-lock might take multiple iterations, because
441          * when the last sub-lock of a given top-lock is deleted, top-lock is
442          * canceled proactively, and this requires to release sub-lock
443          * mutex. Once sub-lock mutex has been released, list of its parents
444          * has to be re-scanned from the beginning.
445          */
446         do {
447                 struct lov_lock      *lov;
448                 struct lov_lock_link *scan;
449                 struct lov_lock_link *temp;
450                 struct lov_lock_sub  *subdata;
451
452                 restart = 0;
453                 list_for_each_entry_safe(scan, temp,
454                                          &sub->lss_parents, lll_list) {
455                         lov     = scan->lll_super;
456                         subdata = &lov->lls_sub[scan->lll_idx];
457                         lovsub_parent_lock(env, lov);
458                         subdata->sub_got = subdata->sub_descr;
459                         lov_lock_unlink(env, scan, sub);
460                         restart = lovsub_lock_delete_one(env, child, lov);
461                         lovsub_parent_unlock(env, lov);
462
463                         if (restart) {
464                                 cl_lock_mutex_get(env, child);
465                                 break;
466                         }
467                }
468         } while (restart);
469         EXIT;
470 }
471
472 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
473                              lu_printer_t p, const struct cl_lock_slice *slice)
474 {
475         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
476         struct lov_lock      *lov;
477         struct lov_lock_link *scan;
478
479         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
480                 lov = scan->lll_super;
481                 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
482                 if (lov != NULL)
483                         cl_lock_descr_print(env, cookie, p,
484                                             &lov->lls_cl.cls_lock->cll_descr);
485                 (*p)(env, cookie, "] ");
486         }
487         return 0;
488 }
489
490 static const struct cl_lock_operations lovsub_lock_ops = {
491         .clo_fini    = lovsub_lock_fini,
492         .clo_state   = lovsub_lock_state,
493         .clo_delete  = lovsub_lock_delete,
494         .clo_modify  = lovsub_lock_modify,
495         .clo_closure = lovsub_lock_closure,
496         .clo_weigh   = lovsub_lock_weigh,
497         .clo_print   = lovsub_lock_print
498 };
499
500 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
501                      struct cl_lock *lock, const struct cl_io *io)
502 {
503         struct lovsub_lock *lsk;
504         int result;
505
506         ENTRY;
507         OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, CFS_ALLOC_IO);
508         if (lsk != NULL) {
509                 CFS_INIT_LIST_HEAD(&lsk->lss_parents);
510                 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
511                 result = 0;
512         } else
513                 result = -ENOMEM;
514         RETURN(result);
515 }
516
517 /** @} lov */