Whamcloud - gitweb
03bd7b606c939782149fe8ce913cbeb2cd2814ea
[fs/lustre-release.git] / lustre / ofd / ofd_dlm.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_dlm.c
37  *
38  * Author: Mike Pershin <tappro@whamcloud.com>
39  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_FILTER
43
44 #include "ofd_internal.h"
45
46 struct ofd_intent_args {
47         struct ldlm_lock        **victim;
48         __u64                    size;
49         int                     *liblustre;
50 };
51
52 static enum interval_iter ofd_intent_cb(struct interval_node *n, void *args)
53 {
54         struct ldlm_interval     *node = (struct ldlm_interval *)n;
55         struct ofd_intent_args   *arg = args;
56         __u64                     size = arg->size;
57         struct ldlm_lock        **v = arg->victim;
58         struct ldlm_lock         *lck;
59
60         /* If the interval is lower than the current file size, just break. */
61         if (interval_high(n) <= size)
62                 return INTERVAL_ITER_STOP;
63
64         list_for_each_entry(lck, &node->li_group, l_sl_policy) {
65                 /* Don't send glimpse ASTs to liblustre clients.
66                  * They aren't listening for them, and they do
67                  * entirely synchronous I/O anyways. */
68                 if (lck->l_export == NULL || lck->l_export->exp_libclient)
69                         continue;
70
71                 if (*arg->liblustre)
72                         *arg->liblustre = 0;
73
74                 if (*v == NULL) {
75                         *v = LDLM_LOCK_GET(lck);
76                 } else if ((*v)->l_policy_data.l_extent.start <
77                            lck->l_policy_data.l_extent.start) {
78                         LDLM_LOCK_RELEASE(*v);
79                         *v = LDLM_LOCK_GET(lck);
80                 }
81
82                 /* the same policy group - every lock has the
83                  * same extent, so needn't do it any more */
84                 break;
85         }
86
87         return INTERVAL_ITER_CONT;
88 }
89
90 int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
91                       void *req_cookie, ldlm_mode_t mode, __u64 flags,
92                       void *data)
93 {
94         struct ptlrpc_request           *req = req_cookie;
95         struct ldlm_lock                *lock = *lockp, *l = NULL;
96         struct ldlm_resource            *res = lock->l_resource;
97         ldlm_processing_policy           policy;
98         struct ost_lvb                  *res_lvb, *reply_lvb;
99         struct ldlm_reply               *rep;
100         ldlm_error_t                     err;
101         int                              idx, rc, only_liblustre = 1;
102         struct ldlm_interval_tree       *tree;
103         struct ofd_intent_args           arg;
104         __u32                            repsize[3] = {
105                 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
106                 [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
107                 [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb)
108         };
109         struct ldlm_glimpse_work         gl_work;
110         struct list_head                 gl_list;
111         ENTRY;
112
113         INIT_LIST_HEAD(&gl_list);
114         lock->l_lvb_type = LVB_T_OST;
115         policy = ldlm_get_processing_policy(res);
116         LASSERT(policy != NULL);
117         LASSERT(req != NULL);
118
119         rc = lustre_pack_reply(req, 3, repsize, NULL);
120         if (rc)
121                 RETURN(req->rq_status = rc);
122
123         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
124         LASSERT(rep != NULL);
125
126         reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
127                                    sizeof(*reply_lvb));
128         LASSERT(reply_lvb != NULL);
129
130         /* Call the extent policy function to see if our request can be
131          * granted, or is blocked.
132          * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse
133          * lock, and should not be granted if the lock will be blocked.
134          */
135
136         if (flags & LDLM_FL_BLOCK_NOWAIT) {
137                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5);
138
139                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK))
140                         RETURN(ELDLM_LOCK_ABORTED);
141         }
142
143         LASSERT(ns == ldlm_res_to_ns(res));
144         lock_res(res);
145
146         /* Check if this is a resend case (MSG_RESENT is set on RPC) and a
147          * lock was found by ldlm_handle_enqueue(); if so no need to grant
148          * it again. */
149         if (flags & LDLM_FL_RESENT) {
150                 rc = LDLM_ITER_CONTINUE;
151         } else {
152                 __u64 tmpflags = 0;
153                 rc = policy(lock, &tmpflags, 0, &err, NULL);
154                 check_res_locked(res);
155         }
156
157         /* The lock met with no resistance; we're finished. */
158         if (rc == LDLM_ITER_CONTINUE) {
159                 /* do not grant locks to the liblustre clients: they cannot
160                  * handle ASTs robustly.  We need to do this while still
161                  * holding ns_lock to avoid the lock remaining on the res_link
162                  * list (and potentially being added to l_pending_list by an
163                  * AST) when we are going to drop this lock ASAP. */
164                 if (lock->l_export->exp_libclient ||
165                     OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {
166                         ldlm_resource_unlink_lock(lock);
167                         err = ELDLM_LOCK_ABORTED;
168                 } else {
169                         err = ELDLM_LOCK_REPLACED;
170                 }
171                 unlock_res(res);
172                 RETURN(err);
173         } else if (flags & LDLM_FL_BLOCK_NOWAIT) {
174                 /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse
175                  * callback for glimpse size. The real size user will trigger
176                  * the glimpse callback when necessary. */
177                 unlock_res(res);
178                 RETURN(ELDLM_LOCK_ABORTED);
179         }
180
181         /* Do not grant any lock, but instead send GL callbacks.  The extent
182          * policy nicely created a list of all PW locks for us.  We will choose
183          * the highest of those which are larger than the size in the LVB, if
184          * any, and perform a glimpse callback. */
185         res_lvb = res->lr_lvb_data;
186         LASSERT(res_lvb != NULL);
187         *reply_lvb = *res_lvb;
188
189         /*
190          * ->ns_lock guarantees that no new locks are granted, and,
191          *  therefore, that res->lr_lvb_data cannot increase beyond the
192          *  end of already granted lock. As a result, it is safe to
193          *  check against "stale" reply_lvb->lvb_size value without
194          *  res->lr_lvb_sem.
195          */
196         arg.size = reply_lvb->lvb_size;
197         arg.victim = &l;
198         arg.liblustre = &only_liblustre;
199
200         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
201                 tree = &res->lr_itree[idx];
202                 if (tree->lit_mode == LCK_PR)
203                         continue;
204
205                 interval_iterate_reverse(tree->lit_root, ofd_intent_cb, &arg);
206         }
207         unlock_res(res);
208
209         /* There were no PW locks beyond the size in the LVB; finished. */
210         if (l == NULL) {
211                 if (only_liblustre) {
212                         /* If we discovered a liblustre client with a PW lock,
213                          * however, the LVB may be out of date!  The LVB is
214                          * updated only on glimpse (which we don't do for
215                          * liblustre clients) and cancel (which the client
216                          * obviously has not yet done).  So if it has written
217                          * data but kept the lock, the LVB is stale and needs
218                          * to be updated from disk.
219                          *
220                          * Of course, this will all disappear when we switch to
221                          * taking liblustre locks on the OST. */
222                         ldlm_res_lvbo_update(res, NULL, 1);
223                 }
224                 RETURN(ELDLM_LOCK_ABORTED);
225         }
226
227         /*
228          * This check is for lock taken in ofd_prepare_destroy() that does
229          * not have l_glimpse_ast set. So the logic is: if there is a lock
230          * with no l_glimpse_ast set, this object is being destroyed already.
231          * Hence, if you are grabbing DLM locks on the server, always set
232          * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).
233          */
234         if (l->l_glimpse_ast == NULL) {
235                 /* We are racing with unlink(); just return -ENOENT */
236                 rep->lock_policy_res1 = ptlrpc_status_hton(-ENOENT);
237                 goto out;
238         }
239
240         /* Populate the gl_work structure.
241          * Grab additional reference on the lock which will be released in
242          * ldlm_work_gl_ast_lock() */
243         gl_work.gl_lock = LDLM_LOCK_GET(l);
244         /* The glimpse callback is sent to one single extent lock. As a result,
245          * the gl_work list is just composed of one element */
246         list_add_tail(&gl_work.gl_list, &gl_list);
247         /* There is actually no need for a glimpse descriptor when glimpsing
248          * extent locks */
249         gl_work.gl_desc = NULL;
250         /* the ldlm_glimpse_work structure is allocated on the stack */
251         gl_work.gl_flags = LDLM_GL_WORK_NOFREE;
252
253         rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */
254
255         if (!list_empty(&gl_list))
256                 LDLM_LOCK_RELEASE(l);
257
258         lock_res(res);
259         *reply_lvb = *res_lvb;
260         unlock_res(res);
261
262 out:
263         LDLM_LOCK_RELEASE(l);
264
265         RETURN(ELDLM_LOCK_ABORTED);
266 }
267