Whamcloud - gitweb
LU-8726 osd-ldiskfs: bypass read for benchmarking
[fs/lustre-release.git] / lustre / ofd / ofd_dlm.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_dlm.c
33  *
34  * This file contains OBD Filter Device (OFD) LDLM-related code which is just
35  * intent handling for glimpse lock.
36  *
37  * Author: Andreas Dilger <andreas.dilger@intel.com>
38  * Author: Jinshan Xiong <jinshan.xiong@intel.com>
39  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
40  * Author: Mikhail Pershin <mike.pershin@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_FILTER
44
45 #include "ofd_internal.h"
46
47 struct ofd_intent_args {
48         struct ldlm_lock        **victim;
49         __u64                    size;
50         int                     *liblustre;
51 };
52
53 /**
54  * OFD interval callback.
55  *
56  * The interval_callback_t is part of interval_iterate_reverse() and is called
57  * for each interval in tree. The OFD interval callback searches for locks
58  * covering extents beyond the given args->size. This is used to decide if LVB
59  * data is outdated.
60  *
61  * \param[in] n         interval node
62  * \param[in] args      intent arguments
63  *
64  * \retval              INTERVAL_ITER_STOP if the interval is lower than
65  *                      file size, caller stops execution
66  * \retval              INTERVAL_ITER_CONT if callback finished successfully
67  *                      and caller may continue execution
68  */
69 static enum interval_iter ofd_intent_cb(struct interval_node *n, void *args)
70 {
71         struct ldlm_interval     *node = (struct ldlm_interval *)n;
72         struct ofd_intent_args   *arg = args;
73         __u64                     size = arg->size;
74         struct ldlm_lock        **v = arg->victim;
75         struct ldlm_lock         *lck;
76
77         /* If the interval is lower than the current file size, just break. */
78         if (interval_high(n) <= size)
79                 return INTERVAL_ITER_STOP;
80
81         list_for_each_entry(lck, &node->li_group, l_sl_policy) {
82                 /* Don't send glimpse ASTs to liblustre clients.
83                  * They aren't listening for them, and they do
84                  * entirely synchronous I/O anyways. */
85                 if (lck->l_export == NULL || lck->l_export->exp_libclient)
86                         continue;
87
88                 if (*arg->liblustre)
89                         *arg->liblustre = 0;
90
91                 if (*v == NULL) {
92                         *v = LDLM_LOCK_GET(lck);
93                 } else if ((*v)->l_policy_data.l_extent.start <
94                            lck->l_policy_data.l_extent.start) {
95                         LDLM_LOCK_RELEASE(*v);
96                         *v = LDLM_LOCK_GET(lck);
97                 }
98
99                 /* the same policy group - every lock has the
100                  * same extent, so needn't do it any more */
101                 break;
102         }
103
104         return INTERVAL_ITER_CONT;
105 }
106
107 /**
108  * OFD lock intent policy
109  *
110  * This defines ldlm_namespace::ns_policy interface for OFD.
111  * Intent policy is called when lock has an intent, for OFD that
112  * means glimpse lock and policy fills Lock Value Block (LVB).
113  *
114  * If already granted lock is found it will be placed in \a lockp and
115  * returned back to caller function.
116  *
117  * \param[in] ns         namespace
118  * \param[in,out] lockp  pointer to the lock
119  * \param[in] req_cookie incoming request
120  * \param[in] mode       LDLM mode
121  * \param[in] flags      LDLM flags
122  * \param[in] data       opaque data, not used in OFD policy
123  *
124  * \retval              ELDLM_LOCK_REPLACED if already granted lock was found
125  *                      and placed in \a lockp
126  * \retval              ELDLM_LOCK_ABORTED in other cases except error
127  * \retval              negative value on error
128  */
129 int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
130                       void *req_cookie, enum ldlm_mode mode, __u64 flags,
131                       void *data)
132 {
133         struct ptlrpc_request *req = req_cookie;
134         struct ldlm_lock *lock = *lockp, *l = NULL;
135         struct ldlm_resource *res = lock->l_resource;
136         ldlm_processing_policy policy;
137         struct ost_lvb *res_lvb, *reply_lvb;
138         struct ldlm_reply *rep;
139         enum ldlm_error err;
140         int idx, rc, only_liblustre = 1;
141         struct ldlm_interval_tree *tree;
142         struct ofd_intent_args arg;
143         __u32 repsize[3] = {
144                 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
145                 [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
146                 [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb)
147         };
148         struct ldlm_glimpse_work gl_work = {};
149         struct list_head gl_list;
150         ENTRY;
151
152         INIT_LIST_HEAD(&gl_list);
153         lock->l_lvb_type = LVB_T_OST;
154         policy = ldlm_get_processing_policy(res);
155         LASSERT(policy != NULL);
156         LASSERT(req != NULL);
157
158         rc = lustre_pack_reply(req, 3, repsize, NULL);
159         if (rc)
160                 RETURN(req->rq_status = rc);
161
162         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
163         LASSERT(rep != NULL);
164
165         reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
166                                    sizeof(*reply_lvb));
167         LASSERT(reply_lvb != NULL);
168
169         /* Call the extent policy function to see if our request can be
170          * granted, or is blocked.
171          * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse
172          * lock, and should not be granted if the lock will be blocked.
173          */
174
175         if (flags & LDLM_FL_BLOCK_NOWAIT) {
176                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5);
177
178                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK))
179                         RETURN(ELDLM_LOCK_ABORTED);
180         }
181
182         LASSERT(ns == ldlm_res_to_ns(res));
183         lock_res(res);
184
185         /* Check if this is a resend case (MSG_RESENT is set on RPC) and a
186          * lock was found by ldlm_handle_enqueue(); if so no need to grant
187          * it again. */
188         if (flags & LDLM_FL_RESENT) {
189                 rc = LDLM_ITER_CONTINUE;
190         } else {
191                 __u64 tmpflags = 0;
192                 rc = policy(lock, &tmpflags, 0, &err, NULL);
193                 check_res_locked(res);
194         }
195
196         /* The lock met with no resistance; we're finished. */
197         if (rc == LDLM_ITER_CONTINUE) {
198                 /* do not grant locks to the liblustre clients: they cannot
199                  * handle ASTs robustly.  We need to do this while still
200                  * holding ns_lock to avoid the lock remaining on the res_link
201                  * list (and potentially being added to l_pending_list by an
202                  * AST) when we are going to drop this lock ASAP. */
203                 if (lock->l_export->exp_libclient ||
204                     OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {
205                         ldlm_resource_unlink_lock(lock);
206                         err = ELDLM_LOCK_ABORTED;
207                 } else {
208                         err = ELDLM_LOCK_REPLACED;
209                 }
210                 unlock_res(res);
211                 RETURN(err);
212         } else if (flags & LDLM_FL_BLOCK_NOWAIT) {
213                 /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse
214                  * callback for glimpse size. The real size user will trigger
215                  * the glimpse callback when necessary. */
216                 unlock_res(res);
217                 RETURN(ELDLM_LOCK_ABORTED);
218         }
219
220         /* Do not grant any lock, but instead send GL callbacks.  The extent
221          * policy nicely created a list of all PW locks for us.  We will choose
222          * the highest of those which are larger than the size in the LVB, if
223          * any, and perform a glimpse callback. */
224         res_lvb = res->lr_lvb_data;
225         LASSERT(res_lvb != NULL);
226         *reply_lvb = *res_lvb;
227
228         /*
229          * ->ns_lock guarantees that no new locks are granted, and,
230          *  therefore, that res->lr_lvb_data cannot increase beyond the
231          *  end of already granted lock. As a result, it is safe to
232          *  check against "stale" reply_lvb->lvb_size value without
233          *  res->lr_lvb_sem.
234          */
235         arg.size = reply_lvb->lvb_size;
236         arg.victim = &l;
237         arg.liblustre = &only_liblustre;
238
239         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
240                 tree = &res->lr_itree[idx];
241                 if (tree->lit_mode == LCK_PR)
242                         continue;
243
244                 interval_iterate_reverse(tree->lit_root, ofd_intent_cb, &arg);
245         }
246         unlock_res(res);
247
248         /* There were no PW locks beyond the size in the LVB; finished. */
249         if (l == NULL) {
250                 if (only_liblustre) {
251                         /* If we discovered a liblustre client with a PW lock,
252                          * however, the LVB may be out of date!  The LVB is
253                          * updated only on glimpse (which we don't do for
254                          * liblustre clients) and cancel (which the client
255                          * obviously has not yet done).  So if it has written
256                          * data but kept the lock, the LVB is stale and needs
257                          * to be updated from disk.
258                          *
259                          * Of course, this will all disappear when we switch to
260                          * taking liblustre locks on the OST. */
261                         ldlm_res_lvbo_update(res, NULL, 1);
262                 }
263                 RETURN(ELDLM_LOCK_ABORTED);
264         }
265
266         /*
267          * This check is for lock taken in ofd_destroy_by_fid() that does
268          * not have l_glimpse_ast set. So the logic is: if there is a lock
269          * with no l_glimpse_ast set, this object is being destroyed already.
270          * Hence, if you are grabbing DLM locks on the server, always set
271          * non-NULL glimpse_ast (e.g., ldlm_request.c::ldlm_glimpse_ast()).
272          */
273         if (l->l_glimpse_ast == NULL) {
274                 /* We are racing with unlink(); just return -ENOENT */
275                 rep->lock_policy_res1 = ptlrpc_status_hton(-ENOENT);
276                 goto out;
277         }
278
279         /* Populate the gl_work structure.
280          * Grab additional reference on the lock which will be released in
281          * ldlm_work_gl_ast_lock() */
282         gl_work.gl_lock = LDLM_LOCK_GET(l);
283         /* The glimpse callback is sent to one single extent lock. As a result,
284          * the gl_work list is just composed of one element */
285         list_add_tail(&gl_work.gl_list, &gl_list);
286         /* There is actually no need for a glimpse descriptor when glimpsing
287          * extent locks */
288         gl_work.gl_desc = NULL;
289         /* the ldlm_glimpse_work structure is allocated on the stack */
290         gl_work.gl_flags = LDLM_GL_WORK_NOFREE;
291
292         rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */
293
294         if (!list_empty(&gl_list))
295                 LDLM_LOCK_RELEASE(l);
296
297         lock_res(res);
298         *reply_lvb = *res_lvb;
299         unlock_res(res);
300
301 out:
302         LDLM_LOCK_RELEASE(l);
303
304         RETURN(ELDLM_LOCK_ABORTED);
305 }
306