Whamcloud - gitweb
land clio.
[fs/lustre-release.git] / lustre / lov / lovsub_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOVSUB layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov lov @{ */
46
47 /*****************************************************************************
48  *
49  * Lovsub lock operations.
50  *
51  */
52
53 static void lovsub_lock_fini(const struct lu_env *env,
54                              struct cl_lock_slice *slice)
55 {
56         struct lovsub_lock   *lsl;
57
58         ENTRY;
59         lsl = cl2lovsub_lock(slice);
60         LASSERT(list_empty(&lsl->lss_parents));
61         OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
62         EXIT;
63 }
64
65 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
66 {
67         struct cl_lock *parent;
68
69         ENTRY;
70         parent = lov->lls_cl.cls_lock;
71         cl_lock_get(parent);
72         lu_ref_add(&parent->cll_reference, "lovsub-parent", cfs_current());
73         cl_lock_mutex_get(env, parent);
74         EXIT;
75 }
76
77 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
78 {
79         struct cl_lock *parent;
80
81         ENTRY;
82         parent = lov->lls_cl.cls_lock;
83         cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
84         lu_ref_del(&parent->cll_reference, "lovsub-parent", cfs_current());
85         cl_lock_put(env, parent);
86         EXIT;
87 }
88
89 static void lovsub_lock_state_one(const struct lu_env *env,
90                                   const struct lovsub_lock *lovsub,
91                                   struct lov_lock *lov)
92 {
93         struct cl_lock       *parent;
94         const struct cl_lock *child;
95
96         ENTRY;
97         parent = lov->lls_cl.cls_lock;
98         child  = lovsub->lss_cl.cls_lock;
99
100         if (lovsub->lss_active != parent) {
101                 lovsub_parent_lock(env, lov);
102                 if (child->cll_error != 0)
103                         cl_lock_error(env, parent, child->cll_error);
104                 else
105                         cl_lock_signal(env, parent);
106                 lovsub_parent_unlock(env, lov);
107         }
108         EXIT;
109 }
110
111 /**
112  * Implements cl_lock_operations::clo_state() method for lovsub layer, which
113  * method is called whenever sub-lock state changes. Propagates state change
114  * to the top-locks.
115  */
116 static void lovsub_lock_state(const struct lu_env *env,
117                               const struct cl_lock_slice *slice,
118                               enum cl_lock_state state)
119 {
120         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
121         struct lov_lock_link *scan;
122         struct lov_lock_link *temp;
123
124         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
125         ENTRY;
126
127         /*
128          * Use _safe() version, because
129          *
130          *     lovsub_lock_state_one()
131          *       ->cl_lock_error()
132          *         ->cl_lock_delete()
133          *           ->lov_lock_delete()
134          *
135          * can unlink parent from the parent list.
136          */
137         list_for_each_entry_safe(scan, temp, &sub->lss_parents, lll_list)
138                 lovsub_lock_state_one(env, sub, scan->lll_super);
139         EXIT;
140 }
141
142 /**
143  * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
144  * asking parent lock.
145  */
146 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
147                                        const struct cl_lock_slice *slice)
148 {
149         struct lovsub_lock *lock = cl2lovsub_lock(slice);
150         struct lov_lock    *lov;
151         unsigned long       dumbbell;
152
153         ENTRY;
154
155         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
156
157         if (!list_empty(&lock->lss_parents)) {
158                 /*
159                  * It is not clear whether all parents have to be asked and
160                  * their estimations summed, or it is enough to ask one. For
161                  * the current usages, one is always enough.
162                  */
163                 lov = container_of(lock->lss_parents.next,
164                                    struct lov_lock_link, lll_list)->lll_super;
165
166                 lovsub_parent_lock(env, lov);
167                 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
168                 lovsub_parent_unlock(env, lov);
169         } else
170                 dumbbell = 0;
171
172         RETURN(dumbbell);
173 }
174
175 /**
176  * Maps start/end offsets within a stripe, to offsets within a file.
177  */
178 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
179                                   struct lov_object *obj,
180                                   int stripe, struct cl_lock_descr *out)
181 {
182         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
183         pgoff_t size; /* stripe size in pages */
184         pgoff_t skip; /* how many pages in every stripe are occupied by
185                        * "other" stripes */
186         pgoff_t start;
187         pgoff_t end;
188
189         ENTRY;
190         start = in->cld_start;
191         end   = in->cld_end;
192
193         /*
194          * XXX join file support.
195          */
196         if (lsm->lsm_stripe_count > 1) {
197                 size = cl_index(lov2cl(obj), lsm->lsm_stripe_size);
198                 skip = (lsm->lsm_stripe_count - 1) * size;
199
200                 /* XXX overflow check here? */
201                 start += start/size * skip + stripe * size;
202
203                 if (end != CL_PAGE_EOF) {
204                         end += end/size * skip + stripe * size;
205                         /*
206                          * And check for overflow...
207                          */
208                         if (end < in->cld_end)
209                                 end = CL_PAGE_EOF;
210                 }
211         }
212         out->cld_start = start;
213         out->cld_end   = end;
214         EXIT;
215 }
216
217 /**
218  * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
219  * called in two ways:
220  *
221  *     - as part of receive call-back, when server returns granted extent to
222  *       the client, and
223  *
224  *     - when top-lock finds existing sub-lock in the cache.
225  *
226  * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
227  * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
228  */
229 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
230                        struct lovsub_lock *sublock,
231                        const struct cl_lock_descr *d, int idx)
232 {
233         struct cl_lock       *parent;
234         struct cl_lock       *child;
235         struct lovsub_object *subobj;
236         struct cl_lock_descr *pd;
237         struct cl_lock_descr *parent_descr;
238         int                   result;
239
240         parent       = lov->lls_cl.cls_lock;
241         parent_descr = &parent->cll_descr;
242         LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
243
244         child  = sublock->lss_cl.cls_lock;
245         subobj = cl2lovsub(sublock->lss_cl.cls_obj);
246         pd     = &lov_env_info(env)->lti_ldescr;
247
248         pd->cld_obj  = parent_descr->cld_obj;
249         pd->cld_mode = parent_descr->cld_mode;
250         lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
251         lov->lls_sub[idx].sub_got = *d;
252         /*
253          * Notify top-lock about modification, if lock description changes
254          * materially.
255          */
256         if (!cl_lock_ext_match(parent_descr, pd))
257                 result = cl_lock_modify(env, parent, pd);
258         else
259                 result = 0;
260         return result;
261 }
262
263 static int lovsub_lock_modify(const struct lu_env *env,
264                               const struct cl_lock_slice *s,
265                               const struct cl_lock_descr *d)
266 {
267         struct lovsub_lock   *lock   = cl2lovsub_lock(s);
268         struct lov_lock_link *scan;
269         struct lov_lock      *lov;
270         int result                   = 0;
271
272         ENTRY;
273
274         LASSERT(cl_lock_mode_match(d->cld_mode,
275                                    s->cls_lock->cll_descr.cld_mode));
276         list_for_each_entry(scan, &lock->lss_parents, lll_list) {
277                 int rc;
278
279                 lov = scan->lll_super;
280                 lovsub_parent_lock(env, lov);
281                 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
282                 lovsub_parent_unlock(env, lov);
283                 result = result ?: rc;
284         }
285         RETURN(result);
286 }
287
288 static int lovsub_lock_closure(const struct lu_env *env,
289                                const struct cl_lock_slice *slice,
290                                struct cl_lock_closure *closure)
291 {
292         struct lovsub_lock   *sub;
293         struct cl_lock       *parent;
294         struct lov_lock_link *scan;
295         int                   result;
296
297         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
298         ENTRY;
299
300         sub    = cl2lovsub_lock(slice);
301         result = 0;
302
303         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
304                 parent = scan->lll_super->lls_cl.cls_lock;
305                 result = cl_lock_closure_build(env, parent, closure);
306                 if (result != 0)
307                         break;
308         }
309         RETURN(result);
310 }
311
312 /**
313  * An implementation of cl_lock_operations::clo_delete() method. This is
314  * invoked in "bottom-to-top" delete, when lock destruction starts from the
315  * sub-lock (e.g, as a result of ldlm lock LRU policy).
316  */
317 static void lovsub_lock_delete(const struct lu_env *env,
318                                const struct cl_lock_slice *slice)
319 {
320         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
321         struct lov_lock      *lov;
322         struct cl_lock       *parent;
323         struct lov_lock_link *scan;
324         struct lov_lock_link *temp;
325         struct lov_lock_sub  *subdata;
326
327         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
328         ENTRY;
329
330         list_for_each_entry_safe(scan, temp, &sub->lss_parents, lll_list) {
331                 lov     = scan->lll_super;
332                 subdata = &lov->lls_sub[scan->lll_idx];
333                 parent  = lov->lls_cl.cls_lock;
334                 lovsub_parent_lock(env, lov);
335                 subdata->sub_got = subdata->sub_descr;
336                 lov_lock_unlink(env, scan, sub);
337                 CDEBUG(D_DLMTRACE, "%p %p %i %i\n", parent, sub,
338                        lov->lls_nr_filled, parent->cll_state);
339                 switch (parent->cll_state) {
340                 case CLS_NEW:
341                 case CLS_QUEUING:
342                 case CLS_ENQUEUED:
343                 case CLS_FREEING:
344                         cl_lock_signal(env, parent);
345                         break;
346                 case CLS_UNLOCKING:
347                         /*
348                          * Here lies a problem: a sub-lock is canceled while
349                          * top-lock is being unlocked. Top-lock cannot be
350                          * moved into CLS_NEW state, because unlocking has to
351                          * succeed eventually by placing lock into CLS_CACHED
352                          * (or failing it), see cl_unuse_try(). Nor can
353                          * top-lock be left in CLS_CACHED state, because lov
354                          * maintains an invariant that all sub-locks exist in
355                          * CLS_CACHED (this allows cached top-lock to be
356                          * reused immediately). Nor can we wait for top-lock
357                          * state to change, because this can be synchronous to
358                          * the current thread.
359                          *
360                          * We know for sure that lov_lock_unuse() will be
361                          * called at least one more time to finish un-using,
362                          * so leave a mark on the top-lock, that will be seen
363                          * by the next call to lov_lock_unuse().
364                          */
365                         lov->lls_unuse_race = 1;
366                         break;
367                 case CLS_CACHED:
368                         cl_lock_state_set(env, parent, CLS_NEW);
369                         if (lov->lls_nr_filled == 0) {
370                                 cl_lock_cancel(env, parent);
371                                 cl_lock_delete(env, parent);
372                                 cl_lock_signal(env, parent);
373                         }
374                         break;
375                 case CLS_HELD:
376                 default:
377                         CERROR("Impossible state: %i\n", parent->cll_state);
378                         LBUG();
379                 }
380                 lovsub_parent_unlock(env, lov);
381         }
382         EXIT;
383 }
384
385 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
386                              lu_printer_t p, const struct cl_lock_slice *slice)
387 {
388         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
389         struct lov_lock      *lov;
390         struct lov_lock_link *scan;
391
392         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
393                 lov = scan->lll_super;
394                 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
395                 if (lov != NULL)
396                         cl_lock_descr_print(env, cookie, p,
397                                             &lov->lls_cl.cls_lock->cll_descr);
398                 (*p)(env, cookie, "] ");
399         }
400         return 0;
401 }
402
403 static const struct cl_lock_operations lovsub_lock_ops = {
404         .clo_fini    = lovsub_lock_fini,
405         .clo_state   = lovsub_lock_state,
406         .clo_delete  = lovsub_lock_delete,
407         .clo_modify  = lovsub_lock_modify,
408         .clo_closure = lovsub_lock_closure,
409         .clo_weigh   = lovsub_lock_weigh,
410         .clo_print   = lovsub_lock_print
411 };
412
413 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
414                      struct cl_lock *lock, const struct cl_io *io)
415 {
416         struct lovsub_lock *lsk;
417         int result;
418
419         ENTRY;
420         OBD_SLAB_ALLOC_PTR(lsk, lovsub_lock_kmem);
421         if (lsk != NULL) {
422                 CFS_INIT_LIST_HEAD(&lsk->lss_parents);
423                 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
424                 result = 0;
425         } else
426                 result = -ENOMEM;
427         RETURN(result);
428 }
429
430 /** @} lov */