1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Implementation of cl_lock for LOVSUB layer.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
41 #define DEBUG_SUBSYSTEM S_LOV
43 #include "lov_cl_internal.h"
45 /** \addtogroup lov lov @{ */
47 /*****************************************************************************
49 * Lovsub lock operations.
53 static void lovsub_lock_fini(const struct lu_env *env,
54 struct cl_lock_slice *slice)
56 struct lovsub_lock *lsl;
59 lsl = cl2lovsub_lock(slice);
60 LASSERT(list_empty(&lsl->lss_parents));
61 OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
65 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
67 struct cl_lock *parent;
70 parent = lov->lls_cl.cls_lock;
72 lu_ref_add(&parent->cll_reference, "lovsub-parent", cfs_current());
73 cl_lock_mutex_get(env, parent);
77 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
79 struct cl_lock *parent;
82 parent = lov->lls_cl.cls_lock;
83 cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
84 lu_ref_del(&parent->cll_reference, "lovsub-parent", cfs_current());
85 cl_lock_put(env, parent);
89 static int lovsub_lock_state_one(const struct lu_env *env,
90 const struct lovsub_lock *lovsub,
93 struct cl_lock *parent;
94 struct cl_lock *child;
98 parent = lov->lls_cl.cls_lock;
99 child = lovsub->lss_cl.cls_lock;
101 if (lovsub->lss_active != parent) {
102 lovsub_parent_lock(env, lov);
103 if (child->cll_error != 0 && parent->cll_error == 0) {
105 * This is a deadlock case:
106 * cl_lock_error(for the parent lock)
109 * -> cl_lock_enclosure
110 * -> cl_lock_mutex_try(for the child lock)
112 cl_lock_mutex_put(env, child);
113 cl_lock_error(env, parent, child->cll_error);
116 cl_lock_signal(env, parent);
118 lovsub_parent_unlock(env, lov);
124 * Implements cl_lock_operations::clo_state() method for lovsub layer, which
125 * method is called whenever sub-lock state changes. Propagates state change
128 static void lovsub_lock_state(const struct lu_env *env,
129 const struct cl_lock_slice *slice,
130 enum cl_lock_state state)
132 struct lovsub_lock *sub = cl2lovsub_lock(slice);
133 struct lov_lock_link *scan;
136 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
141 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
142 restart = lovsub_lock_state_one(env, sub,
145 cl_lock_mutex_get(env, slice->cls_lock);
154 * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
155 * asking parent lock.
157 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
158 const struct cl_lock_slice *slice)
160 struct lovsub_lock *lock = cl2lovsub_lock(slice);
161 struct lov_lock *lov;
162 unsigned long dumbbell;
166 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
168 if (!list_empty(&lock->lss_parents)) {
170 * It is not clear whether all parents have to be asked and
171 * their estimations summed, or it is enough to ask one. For
172 * the current usages, one is always enough.
174 lov = container_of(lock->lss_parents.next,
175 struct lov_lock_link, lll_list)->lll_super;
177 lovsub_parent_lock(env, lov);
178 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
179 lovsub_parent_unlock(env, lov);
187 * Maps start/end offsets within a stripe, to offsets within a file.
189 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
190 struct lov_object *obj,
191 int stripe, struct cl_lock_descr *out)
193 struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
194 pgoff_t size; /* stripe size in pages */
195 pgoff_t skip; /* how many pages in every stripe are occupied by
201 start = in->cld_start;
205 * XXX join file support.
207 if (lsm->lsm_stripe_count > 1) {
208 size = cl_index(lov2cl(obj), lsm->lsm_stripe_size);
209 skip = (lsm->lsm_stripe_count - 1) * size;
211 /* XXX overflow check here? */
212 start += start/size * skip + stripe * size;
214 if (end != CL_PAGE_EOF) {
215 end += end/size * skip + stripe * size;
217 * And check for overflow...
219 if (end < in->cld_end)
223 out->cld_start = start;
229 * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
230 * called in two ways:
232 * - as part of receive call-back, when server returns granted extent to
235 * - when top-lock finds existing sub-lock in the cache.
237 * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
238 * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
240 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
241 struct lovsub_lock *sublock,
242 const struct cl_lock_descr *d, int idx)
244 struct cl_lock *parent;
245 struct cl_lock *child;
246 struct lovsub_object *subobj;
247 struct cl_lock_descr *pd;
248 struct cl_lock_descr *parent_descr;
251 parent = lov->lls_cl.cls_lock;
252 parent_descr = &parent->cll_descr;
253 LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
255 child = sublock->lss_cl.cls_lock;
256 subobj = cl2lovsub(sublock->lss_cl.cls_obj);
257 pd = &lov_env_info(env)->lti_ldescr;
259 pd->cld_obj = parent_descr->cld_obj;
260 pd->cld_mode = parent_descr->cld_mode;
261 pd->cld_gid = parent_descr->cld_gid;
262 lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
263 lov->lls_sub[idx].sub_got = *d;
265 * Notify top-lock about modification, if lock description changes
268 if (!cl_lock_ext_match(parent_descr, pd))
269 result = cl_lock_modify(env, parent, pd);
275 static int lovsub_lock_modify(const struct lu_env *env,
276 const struct cl_lock_slice *s,
277 const struct cl_lock_descr *d)
279 struct lovsub_lock *lock = cl2lovsub_lock(s);
280 struct lov_lock_link *scan;
281 struct lov_lock *lov;
286 LASSERT(cl_lock_mode_match(d->cld_mode,
287 s->cls_lock->cll_descr.cld_mode));
288 list_for_each_entry(scan, &lock->lss_parents, lll_list) {
291 lov = scan->lll_super;
292 lovsub_parent_lock(env, lov);
293 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
294 lovsub_parent_unlock(env, lov);
295 result = result ?: rc;
300 static int lovsub_lock_closure(const struct lu_env *env,
301 const struct cl_lock_slice *slice,
302 struct cl_lock_closure *closure)
304 struct lovsub_lock *sub;
305 struct cl_lock *parent;
306 struct lov_lock_link *scan;
309 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
312 sub = cl2lovsub_lock(slice);
315 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
316 parent = scan->lll_super->lls_cl.cls_lock;
317 result = cl_lock_closure_build(env, parent, closure);
325 * A helper function for lovsub_lock_delete() that deals with a given parent
328 static int lovsub_lock_delete_one(const struct lu_env *env,
329 struct cl_lock *child, struct lov_lock *lov)
331 struct cl_lock *parent;
335 parent = lov->lls_cl.cls_lock;
338 switch (parent->cll_state) {
343 cl_lock_signal(env, parent);
347 * Here lies a problem: a sub-lock is canceled while top-lock
348 * is being unlocked. Top-lock cannot be moved into CLS_NEW
349 * state, because unlocking has to succeed eventually by
350 * placing lock into CLS_CACHED (or failing it), see
351 * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
352 * state, because lov maintains an invariant that all
353 * sub-locks exist in CLS_CACHED (this allows cached top-lock
354 * to be reused immediately). Nor can we wait for top-lock
355 * state to change, because this can be synchronous to the
358 * We know for sure that lov_lock_unuse() will be called at
359 * least one more time to finish un-using, so leave a mark on
360 * the top-lock, that will be seen by the next call to
363 lov->lls_unuse_race = 1;
367 * if a sub-lock is canceled move its top-lock into CLS_NEW
368 * state to preserve an invariant that a top-lock in
369 * CLS_CACHED is immediately ready for re-use (i.e., has all
370 * sub-locks), and so that next attempt to re-use the top-lock
371 * enqueues missing sub-lock.
373 cl_lock_state_set(env, parent, CLS_NEW);
375 * if last sub-lock is canceled, destroy the top-lock (which
376 * is now `empty') proactively.
378 if (lov->lls_nr_filled == 0) {
379 /* ... but unfortunately, this cannot be done easily,
380 * as cancellation of a top-lock might acquire mutices
381 * of its other sub-locks, violating lock ordering,
382 * see cl_lock_{cancel,delete}() preconditions.
384 * To work around this, the mutex of this sub-lock is
385 * released, top-lock is destroyed, and sub-lock mutex
386 * acquired again. The list of parents has to be
387 * re-scanned from the beginning after this.
389 * Only do this if no mutices other than on @child and
390 * @parent are held by the current thread.
392 * TODO: The lock modal here is too complex, because
393 * the lock may be canceled and deleted by voluntarily:
395 * -> osc_lock_enqueue_wait
396 * -> osc_lock_cancel_wait
398 * -> lovsub_lock_delete
399 * -> cl_lock_cancel/delete
402 * The better choice is to spawn a kernel thread for
405 if (cl_lock_nr_mutexed(env) == 2) {
406 cl_lock_mutex_put(env, child);
407 cl_lock_cancel(env, parent);
408 cl_lock_delete(env, parent);
415 CERROR("Impossible state: %i\n", parent->cll_state);
423 * An implementation of cl_lock_operations::clo_delete() method. This is
424 * invoked in "bottom-to-top" delete, when lock destruction starts from the
425 * sub-lock (e.g, as a result of ldlm lock LRU policy).
427 static void lovsub_lock_delete(const struct lu_env *env,
428 const struct cl_lock_slice *slice)
430 struct cl_lock *child = slice->cls_lock;
431 struct lovsub_lock *sub = cl2lovsub_lock(slice);
434 LASSERT(cl_lock_is_mutexed(child));
438 * Destruction of a sub-lock might take multiple iterations, because
439 * when the last sub-lock of a given top-lock is deleted, top-lock is
440 * canceled proactively, and this requires to release sub-lock
441 * mutex. Once sub-lock mutex has been released, list of its parents
442 * has to be re-scanned from the beginning.
445 struct lov_lock *lov;
446 struct lov_lock_link *scan;
447 struct lov_lock_link *temp;
448 struct lov_lock_sub *subdata;
451 list_for_each_entry_safe(scan, temp,
452 &sub->lss_parents, lll_list) {
453 lov = scan->lll_super;
454 subdata = &lov->lls_sub[scan->lll_idx];
455 lovsub_parent_lock(env, lov);
456 subdata->sub_got = subdata->sub_descr;
457 lov_lock_unlink(env, scan, sub);
458 restart = lovsub_lock_delete_one(env, child, lov);
459 lovsub_parent_unlock(env, lov);
462 cl_lock_mutex_get(env, child);
470 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
471 lu_printer_t p, const struct cl_lock_slice *slice)
473 struct lovsub_lock *sub = cl2lovsub_lock(slice);
474 struct lov_lock *lov;
475 struct lov_lock_link *scan;
477 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
478 lov = scan->lll_super;
479 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
481 cl_lock_descr_print(env, cookie, p,
482 &lov->lls_cl.cls_lock->cll_descr);
483 (*p)(env, cookie, "] ");
488 static const struct cl_lock_operations lovsub_lock_ops = {
489 .clo_fini = lovsub_lock_fini,
490 .clo_state = lovsub_lock_state,
491 .clo_delete = lovsub_lock_delete,
492 .clo_modify = lovsub_lock_modify,
493 .clo_closure = lovsub_lock_closure,
494 .clo_weigh = lovsub_lock_weigh,
495 .clo_print = lovsub_lock_print
498 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
499 struct cl_lock *lock, const struct cl_io *io)
501 struct lovsub_lock *lsk;
505 OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, CFS_ALLOC_IO);
507 CFS_INIT_LIST_HEAD(&lsk->lss_parents);
508 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);