4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ofd/ofd_dlm.c
38 * Author: Mike Pershin <tappro@whamcloud.com>
39 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42 #define DEBUG_SUBSYSTEM S_FILTER
44 #include "ofd_internal.h"
46 struct ofd_intent_args {
47 struct ldlm_lock **victim;
52 static enum interval_iter ofd_intent_cb(struct interval_node *n, void *args)
54 struct ldlm_interval *node = (struct ldlm_interval *)n;
55 struct ofd_intent_args *arg = args;
56 __u64 size = arg->size;
57 struct ldlm_lock **v = arg->victim;
58 struct ldlm_lock *lck;
60 /* If the interval is lower than the current file size, just break. */
61 if (interval_high(n) <= size)
62 return INTERVAL_ITER_STOP;
64 cfs_list_for_each_entry(lck, &node->li_group, l_sl_policy) {
65 /* Don't send glimpse ASTs to liblustre clients.
66 * They aren't listening for them, and they do
67 * entirely synchronous I/O anyways. */
68 if (lck->l_export == NULL || lck->l_export->exp_libclient)
75 *v = LDLM_LOCK_GET(lck);
76 } else if ((*v)->l_policy_data.l_extent.start <
77 lck->l_policy_data.l_extent.start) {
78 LDLM_LOCK_RELEASE(*v);
79 *v = LDLM_LOCK_GET(lck);
82 /* the same policy group - every lock has the
83 * same extent, so needn't do it any more */
87 return INTERVAL_ITER_CONT;
90 int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
91 void *req_cookie, ldlm_mode_t mode, int flags,
94 struct ptlrpc_request *req = req_cookie;
95 struct ldlm_lock *lock = *lockp, *l = NULL;
96 struct ldlm_resource *res = lock->l_resource;
97 ldlm_processing_policy policy;
98 struct ost_lvb *res_lvb, *reply_lvb;
99 struct ldlm_reply *rep;
102 int tmpflags = 0, only_liblustre = 1;
103 struct ldlm_interval_tree *tree;
104 struct ofd_intent_args arg;
106 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
107 [DLM_LOCKREPLY_OFF] = sizeof(*rep),
108 [DLM_REPLY_REC_OFF] = sizeof(*reply_lvb)
113 policy = ldlm_get_processing_policy(res);
114 LASSERT(policy != NULL);
115 LASSERT(req != NULL);
117 rc = lustre_pack_reply(req, 3, repsize, NULL);
119 RETURN(req->rq_status = rc);
121 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
122 LASSERT(rep != NULL);
124 reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
126 LASSERT(reply_lvb != NULL);
128 /* Call the extent policy function to see if our request can be
129 * granted, or is blocked.
130 * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse
131 * lock, and should not be granted if the lock will be blocked.
134 if (flags & LDLM_FL_BLOCK_NOWAIT) {
135 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5);
137 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK))
138 RETURN(ELDLM_LOCK_ABORTED);
141 LASSERT(ns == ldlm_res_to_ns(res));
143 rc = policy(lock, &tmpflags, 0, &err, NULL);
144 check_res_locked(res);
146 /* The lock met with no resistance; we're finished. */
147 if (rc == LDLM_ITER_CONTINUE) {
148 /* do not grant locks to the liblustre clients: they cannot
149 * handle ASTs robustly. We need to do this while still
150 * holding ns_lock to avoid the lock remaining on the res_link
151 * list (and potentially being added to l_pending_list by an
152 * AST) when we are going to drop this lock ASAP. */
153 if (lock->l_export->exp_libclient ||
154 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {
155 ldlm_resource_unlink_lock(lock);
156 err = ELDLM_LOCK_ABORTED;
158 err = ELDLM_LOCK_REPLACED;
162 } else if (flags & LDLM_FL_BLOCK_NOWAIT) {
163 /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse
164 * callback for glimpse size. The real size user will trigger
165 * the glimpse callback when necessary. */
167 RETURN(ELDLM_LOCK_ABORTED);
170 /* Do not grant any lock, but instead send GL callbacks. The extent
171 * policy nicely created a list of all PW locks for us. We will choose
172 * the highest of those which are larger than the size in the LVB, if
173 * any, and perform a glimpse callback. */
174 res_lvb = res->lr_lvb_data;
175 LASSERT(res_lvb != NULL);
176 *reply_lvb = *res_lvb;
179 * ->ns_lock guarantees that no new locks are granted, and,
180 * therefore, that res->lr_lvb_data cannot increase beyond the
181 * end of already granted lock. As a result, it is safe to
182 * check against "stale" reply_lvb->lvb_size value without
185 arg.size = reply_lvb->lvb_size;
187 arg.liblustre = &only_liblustre;
189 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
190 tree = &res->lr_itree[idx];
191 if (tree->lit_mode == LCK_PR)
194 interval_iterate_reverse(tree->lit_root, ofd_intent_cb, &arg);
198 /* There were no PW locks beyond the size in the LVB; finished. */
200 if (only_liblustre) {
201 /* If we discovered a liblustre client with a PW lock,
202 * however, the LVB may be out of date! The LVB is
203 * updated only on glimpse (which we don't do for
204 * liblustre clients) and cancel (which the client
205 * obviously has not yet done). So if it has written
206 * data but kept the lock, the LVB is stale and needs
207 * to be updated from disk.
209 * Of course, this will all disappear when we switch to
210 * taking liblustre locks on the OST. */
211 ldlm_res_lvbo_update(res, NULL, 1);
213 RETURN(ELDLM_LOCK_ABORTED);
217 * This check is for lock taken in ofd_prepare_destroy() that does
218 * not have l_glimpse_ast set. So the logic is: if there is a lock
219 * with no l_glimpse_ast set, this object is being destroyed already.
220 * Hence, if you are grabbing DLM locks on the server, always set
221 * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).
223 if (l->l_glimpse_ast == NULL) {
224 /* We are racing with unlink(); just return -ENOENT */
225 rep->lock_policy_res1 = -ENOENT;
229 LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
230 rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
233 *reply_lvb = *res_lvb;
237 LDLM_LOCK_RELEASE(l);
239 RETURN(ELDLM_LOCK_ABORTED);