1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Implementation of cl_lock for LOVSUB layer.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
41 #define DEBUG_SUBSYSTEM S_LOV
43 #include "lov_cl_internal.h"
45 /** \addtogroup lov lov @{ */
47 /*****************************************************************************
49 * Lovsub lock operations.
53 static void lovsub_lock_fini(const struct lu_env *env,
54 struct cl_lock_slice *slice)
56 struct lovsub_lock *lsl;
59 lsl = cl2lovsub_lock(slice);
60 LASSERT(list_empty(&lsl->lss_parents));
61 OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
65 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
67 struct cl_lock *parent;
70 parent = lov->lls_cl.cls_lock;
72 lu_ref_add(&parent->cll_reference, "lovsub-parent", cfs_current());
73 cl_lock_mutex_get(env, parent);
77 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
79 struct cl_lock *parent;
82 parent = lov->lls_cl.cls_lock;
83 cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
84 lu_ref_del(&parent->cll_reference, "lovsub-parent", cfs_current());
85 cl_lock_put(env, parent);
89 static void lovsub_lock_state_one(const struct lu_env *env,
90 const struct lovsub_lock *lovsub,
93 struct cl_lock *parent;
94 const struct cl_lock *child;
97 parent = lov->lls_cl.cls_lock;
98 child = lovsub->lss_cl.cls_lock;
100 if (lovsub->lss_active != parent) {
101 lovsub_parent_lock(env, lov);
102 if (child->cll_error != 0)
103 cl_lock_error(env, parent, child->cll_error);
105 cl_lock_signal(env, parent);
106 lovsub_parent_unlock(env, lov);
112 * Implements cl_lock_operations::clo_state() method for lovsub layer, which
113 * method is called whenever sub-lock state changes. Propagates state change
116 static void lovsub_lock_state(const struct lu_env *env,
117 const struct cl_lock_slice *slice,
118 enum cl_lock_state state)
120 struct lovsub_lock *sub = cl2lovsub_lock(slice);
121 struct lov_lock_link *scan;
122 struct lov_lock_link *temp;
124 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
128 * Use _safe() version, because
130 * lovsub_lock_state_one()
133 * ->lov_lock_delete()
135 * can unlink parent from the parent list.
137 list_for_each_entry_safe(scan, temp, &sub->lss_parents, lll_list)
138 lovsub_lock_state_one(env, sub, scan->lll_super);
143 * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
144 * asking parent lock.
146 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
147 const struct cl_lock_slice *slice)
149 struct lovsub_lock *lock = cl2lovsub_lock(slice);
150 struct lov_lock *lov;
151 unsigned long dumbbell;
155 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
157 if (!list_empty(&lock->lss_parents)) {
159 * It is not clear whether all parents have to be asked and
160 * their estimations summed, or it is enough to ask one. For
161 * the current usages, one is always enough.
163 lov = container_of(lock->lss_parents.next,
164 struct lov_lock_link, lll_list)->lll_super;
166 lovsub_parent_lock(env, lov);
167 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
168 lovsub_parent_unlock(env, lov);
176 * Maps start/end offsets within a stripe, to offsets within a file.
178 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
179 struct lov_object *obj,
180 int stripe, struct cl_lock_descr *out)
182 struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
183 pgoff_t size; /* stripe size in pages */
184 pgoff_t skip; /* how many pages in every stripe are occupied by
190 start = in->cld_start;
194 * XXX join file support.
196 if (lsm->lsm_stripe_count > 1) {
197 size = cl_index(lov2cl(obj), lsm->lsm_stripe_size);
198 skip = (lsm->lsm_stripe_count - 1) * size;
200 /* XXX overflow check here? */
201 start += start/size * skip + stripe * size;
203 if (end != CL_PAGE_EOF) {
204 end += end/size * skip + stripe * size;
206 * And check for overflow...
208 if (end < in->cld_end)
212 out->cld_start = start;
218 * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
219 * called in two ways:
221 * - as part of receive call-back, when server returns granted extent to
224 * - when top-lock finds existing sub-lock in the cache.
226 * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
227 * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
229 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
230 struct lovsub_lock *sublock,
231 const struct cl_lock_descr *d, int idx)
233 struct cl_lock *parent;
234 struct cl_lock *child;
235 struct lovsub_object *subobj;
236 struct cl_lock_descr *pd;
237 struct cl_lock_descr *parent_descr;
240 parent = lov->lls_cl.cls_lock;
241 parent_descr = &parent->cll_descr;
242 LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
244 child = sublock->lss_cl.cls_lock;
245 subobj = cl2lovsub(sublock->lss_cl.cls_obj);
246 pd = &lov_env_info(env)->lti_ldescr;
248 pd->cld_obj = parent_descr->cld_obj;
249 pd->cld_mode = parent_descr->cld_mode;
250 lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
251 lov->lls_sub[idx].sub_got = *d;
253 * Notify top-lock about modification, if lock description changes
256 if (!cl_lock_ext_match(parent_descr, pd))
257 result = cl_lock_modify(env, parent, pd);
263 static int lovsub_lock_modify(const struct lu_env *env,
264 const struct cl_lock_slice *s,
265 const struct cl_lock_descr *d)
267 struct lovsub_lock *lock = cl2lovsub_lock(s);
268 struct lov_lock_link *scan;
269 struct lov_lock *lov;
274 LASSERT(cl_lock_mode_match(d->cld_mode,
275 s->cls_lock->cll_descr.cld_mode));
276 list_for_each_entry(scan, &lock->lss_parents, lll_list) {
279 lov = scan->lll_super;
280 lovsub_parent_lock(env, lov);
281 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
282 lovsub_parent_unlock(env, lov);
283 result = result ?: rc;
288 static int lovsub_lock_closure(const struct lu_env *env,
289 const struct cl_lock_slice *slice,
290 struct cl_lock_closure *closure)
292 struct lovsub_lock *sub;
293 struct cl_lock *parent;
294 struct lov_lock_link *scan;
297 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
300 sub = cl2lovsub_lock(slice);
303 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
304 parent = scan->lll_super->lls_cl.cls_lock;
305 result = cl_lock_closure_build(env, parent, closure);
313 * An implementation of cl_lock_operations::clo_delete() method. This is
314 * invoked in "bottom-to-top" delete, when lock destruction starts from the
315 * sub-lock (e.g, as a result of ldlm lock LRU policy).
317 static void lovsub_lock_delete(const struct lu_env *env,
318 const struct cl_lock_slice *slice)
320 struct lovsub_lock *sub = cl2lovsub_lock(slice);
321 struct lov_lock *lov;
322 struct cl_lock *parent;
323 struct lov_lock_link *scan;
324 struct lov_lock_link *temp;
325 struct lov_lock_sub *subdata;
327 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
330 list_for_each_entry_safe(scan, temp, &sub->lss_parents, lll_list) {
331 lov = scan->lll_super;
332 subdata = &lov->lls_sub[scan->lll_idx];
333 parent = lov->lls_cl.cls_lock;
334 lovsub_parent_lock(env, lov);
335 subdata->sub_got = subdata->sub_descr;
336 lov_lock_unlink(env, scan, sub);
337 CDEBUG(D_DLMTRACE, "%p %p %i %i\n", parent, sub,
338 lov->lls_nr_filled, parent->cll_state);
339 switch (parent->cll_state) {
344 cl_lock_signal(env, parent);
348 * Here lies a problem: a sub-lock is canceled while
349 * top-lock is being unlocked. Top-lock cannot be
350 * moved into CLS_NEW state, because unlocking has to
351 * succeed eventually by placing lock into CLS_CACHED
352 * (or failing it), see cl_unuse_try(). Nor can
353 * top-lock be left in CLS_CACHED state, because lov
354 * maintains an invariant that all sub-locks exist in
355 * CLS_CACHED (this allows cached top-lock to be
356 * reused immediately). Nor can we wait for top-lock
357 * state to change, because this can be synchronous to
358 * the current thread.
360 * We know for sure that lov_lock_unuse() will be
361 * called at least one more time to finish un-using,
362 * so leave a mark on the top-lock, that will be seen
363 * by the next call to lov_lock_unuse().
365 lov->lls_unuse_race = 1;
368 cl_lock_state_set(env, parent, CLS_NEW);
369 if (lov->lls_nr_filled == 0) {
370 cl_lock_cancel(env, parent);
371 cl_lock_delete(env, parent);
372 cl_lock_signal(env, parent);
377 CERROR("Impossible state: %i\n", parent->cll_state);
380 lovsub_parent_unlock(env, lov);
385 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
386 lu_printer_t p, const struct cl_lock_slice *slice)
388 struct lovsub_lock *sub = cl2lovsub_lock(slice);
389 struct lov_lock *lov;
390 struct lov_lock_link *scan;
392 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
393 lov = scan->lll_super;
394 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
396 cl_lock_descr_print(env, cookie, p,
397 &lov->lls_cl.cls_lock->cll_descr);
398 (*p)(env, cookie, "] ");
403 static const struct cl_lock_operations lovsub_lock_ops = {
404 .clo_fini = lovsub_lock_fini,
405 .clo_state = lovsub_lock_state,
406 .clo_delete = lovsub_lock_delete,
407 .clo_modify = lovsub_lock_modify,
408 .clo_closure = lovsub_lock_closure,
409 .clo_weigh = lovsub_lock_weigh,
410 .clo_print = lovsub_lock_print
413 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
414 struct cl_lock *lock, const struct cl_io *io)
416 struct lovsub_lock *lsk;
420 OBD_SLAB_ALLOC_PTR(lsk, lovsub_lock_kmem);
422 CFS_INIT_LIST_HEAD(&lsk->lss_parents);
423 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);