Whamcloud - gitweb
b=3984
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LDLM
25 #ifndef __KERNEL__
26 # include <liblustre.h>
27 #endif
28
29 #include <linux/lustre_dlm.h>
30 #include <linux/obd_support.h>
31 #include <linux/lustre_lib.h>
32
33 #include "ldlm_internal.h"
34
35 /* The purpose of this function is to return:
36  * - the maximum extent
37  * - containing the requested extent
38  * - and not overlapping existing conflicting extents outside the requested one
39  */
40 static void
41 ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
42                             struct ldlm_extent *new_ex)
43 {
44         struct list_head *tmp;
45         ldlm_mode_t req_mode = req->l_req_mode;
46         __u64 req_start = req->l_req_extent.start;
47         __u64 req_end = req->l_req_extent.end;
48         int conflicting = 0;
49         ENTRY;
50
51         lockmode_verify(req_mode);
52
53         list_for_each(tmp, queue) {
54                 struct ldlm_lock *lock;
55                 struct ldlm_extent *l_extent;
56
57                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
58                 l_extent = &lock->l_policy_data.l_extent;
59
60                 if (new_ex->start == req_start && new_ex->end == req_end) {
61                         EXIT;
62                         return;
63                 }
64
65                 /* Don't conflict with ourselves */
66                 if (req == lock)
67                         continue;
68
69                 /* Locks are compatible, overlap doesn't matter */
70                 /* Until bug 20 is fixed, try to avoid granting overlapping
71                  * locks on one client (they take a long time to cancel) */
72                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
73                     lock->l_export != req->l_export)
74                         continue;
75
76                 /* If this is a high-traffic lock, don't grow downwards at all
77                  * or grow upwards too much */
78                 ++conflicting;
79                 if (conflicting > 4)
80                         new_ex->start = req_start;
81
82                 /* If lock doesn't overlap new_ex, skip it. */
83                 if (l_extent->end < new_ex->start ||
84                     l_extent->start > new_ex->end)
85                         continue;
86
87                 /* Locks conflicting in requested extents and we can't satisfy
88                  * both locks, so ignore it.  Either we will ping-pong this
89                  * extent (we would regardless of what extent we granted) or
90                  * lock is unused and it shouldn't limit our extent growth. */
91                 if (lock->l_req_extent.end >= req_start &&
92                     lock->l_req_extent.start <= req_end)
93                         continue;
94
95                 /* We grow extents downwards only as far as they don't overlap
96                  * with already-granted locks, on the assumtion that clients
97                  * will be writing beyond the initial requested end and would
98                  * then need to enqueue a new lock beyond previous request.
99                  * l_req_extent->end strictly < req_start, checked above. */
100                 if (l_extent->start < req_start && new_ex->start != req_start) {
101                         if (l_extent->end >= req_start)
102                                 new_ex->start = req_start;
103                         else
104                                 new_ex->start = min(l_extent->end+1, req_start);
105                 }
106
107                 /* If we need to cancel this lock anyways because our request
108                  * overlaps the granted lock, we grow up to its requested
109                  * extent start instead of limiting this extent, assuming that
110                  * clients are writing forwards and the lock had over grown
111                  * its extent downwards before we enqueued our request. */
112                 if (l_extent->end > req_end) {
113                         if (l_extent->start <= req_end)
114                                 new_ex->end = max(lock->l_req_extent.start - 1,
115                                                   req_end);
116                         else
117                                 new_ex->end = max(l_extent->start - 1, req_end);
118                 }
119         }
120
121 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
122         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
123                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
124                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
125                                           new_ex->end);
126         }
127         EXIT;
128 }
129
130 /* In order to determine the largest possible extent we can grant, we need
131  * to scan all of the queues. */
132 static void ldlm_extent_policy(struct ldlm_resource *res,
133                                struct ldlm_lock *lock, int *flags)
134 {
135         struct ldlm_extent new_ex = { .start = 0, .end = ~0};
136
137         ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
138         ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
139
140         if (new_ex.start != lock->l_policy_data.l_extent.start ||
141             new_ex.end != lock->l_policy_data.l_extent.end) {
142                 *flags |= LDLM_FL_LOCK_CHANGED;
143                 lock->l_policy_data.l_extent.start = new_ex.start;
144                 lock->l_policy_data.l_extent.end = new_ex.end;
145         }
146 }
147
148 /* Determine if the lock is compatible with all locks on the queue.
149  * We stop walking the queue if we hit ourselves so we don't take
150  * conflicting locks enqueued after us into accound, or we'd wait forever.
151  *
152  * 0 if the lock is not compatible
153  * 1 if the lock is compatible
154  * 2 if this group lock is compatible and requires no further checking
155  * negative error, such as EWOULDBLOCK for group locks
156  */
157 static int
158 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
159                          int *flags, ldlm_error_t *err,
160                          struct list_head *work_list)
161 {
162         struct list_head *tmp;
163         struct ldlm_lock *lock;
164         ldlm_mode_t req_mode = req->l_req_mode;
165         __u64 req_start = req->l_req_extent.start;
166         __u64 req_end = req->l_req_extent.end;
167         int compat = 1;
168         int scan = 0;
169         ENTRY;
170
171         lockmode_verify(req_mode);
172
173         list_for_each(tmp, queue) {
174                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
175
176                 if (req == lock)
177                         RETURN(compat);
178
179                 if (scan) {
180                         /* We only get here if we are queuing GROUP lock
181                            and met some incompatible one. The main idea of this
182                            code is to insert GROUP lock past compatible GROUP
183                            lock in the waiting queue or if there is not any,
184                            then in front of first non-GROUP lock */
185                         if (lock->l_req_mode != LCK_GROUP) {
186                         /* Ok, we hit non-GROUP lock, there should be no
187                            more GROUP locks later on, queue in front of
188                            first non-GROUP lock */
189
190                                 ldlm_resource_insert_lock_after(lock, req);
191                                 list_del_init(&lock->l_res_link);
192                                 ldlm_resource_insert_lock_after(req, lock);
193                                 RETURN(0);
194                         }
195                         if (req->l_policy_data.l_extent.gid ==
196                              lock->l_policy_data.l_extent.gid) {
197                                 /* found it */
198                                 ldlm_resource_insert_lock_after(lock,
199                                                                 req);
200                                 RETURN(0);
201                         }
202                         continue;
203                 }
204
205                 /* locks are compatible, overlap doesn't matter */
206                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
207                         /* non-group locks are compatible, overlap doesn't
208                            matter */
209                         if (req_mode != LCK_GROUP)
210                                 continue;
211                                 
212                         /* If we are trying to get a GROUP lock and there is
213                            another one of this kind, we need to compare gid */
214                         if (req->l_policy_data.l_extent.gid ==
215                             lock->l_policy_data.l_extent.gid) {
216                                 if (lock->l_req_mode == lock->l_granted_mode)
217                                         RETURN(2);
218
219                                 /* If we are in nonblocking mode - return
220                                    immediately */
221                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
222                                         compat = -EWOULDBLOCK;
223                                         goto destroylock;
224                                 }
225                                 /* If this group lock is compatible with another
226                                  * group lock on the waiting list, they must be
227                                  * together in the list, so they can be granted
228                                  * at the same time.  Otherwise the later lock
229                                  * can get stuck behind another, incompatible,
230                                  * lock. */
231                                 ldlm_resource_insert_lock_after(lock, req);
232                                 /* Because 'lock' is not granted, we can stop
233                                  * processing this queue and return immediately.
234                                  * There is no need to check the rest of the
235                                  * list. */
236                                 RETURN(0);
237                         }
238                 }
239
240                 if (req_mode == LCK_GROUP &&
241                     (lock->l_req_mode != lock->l_granted_mode)) {
242                         scan = 1;
243                         compat = 0;
244                         if (lock->l_req_mode != LCK_GROUP) {
245                         /* Ok, we hit non-GROUP lock, there should be no
246                            more GROUP locks later on, queue in front of
247                            first non-GROUP lock */
248
249                                 ldlm_resource_insert_lock_after(lock, req);
250                                 list_del_init(&lock->l_res_link);
251                                 ldlm_resource_insert_lock_after(req, lock);
252                                 RETURN(0);
253                         }
254                         if (req->l_policy_data.l_extent.gid ==
255                              lock->l_policy_data.l_extent.gid) {
256                                 /* found it */
257                                 ldlm_resource_insert_lock_after(lock, req);
258                                 RETURN(0);
259                         }
260                         continue;
261                 }
262
263                 if (lock->l_req_mode == LCK_GROUP) {
264                         /* If compared lock is GROUP, then requested is PR/PW/=>
265                          * this is not compatible; extent range does not
266                          * matter */
267                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
268                                 compat = -EWOULDBLOCK;
269                                 goto destroylock;
270                         } else {
271                                 *flags |= LDLM_FL_NO_TIMEOUT;
272                         }
273                 } else if (lock->l_policy_data.l_extent.end < req_start ||
274                            lock->l_policy_data.l_extent.start > req_end) {
275                         /* if a non grouplock doesn't overlap skip it */
276                         continue;
277                 }
278
279                 if (!work_list)
280                         RETURN(0);
281
282                 compat = 0;
283                 if (lock->l_blocking_ast)
284                         ldlm_add_ast_work_item(lock, req, work_list);
285         }
286
287         return(compat);
288 destroylock:
289         list_del_init(&req->l_res_link);
290         ldlm_lock_destroy(req);
291         *err = compat;
292         RETURN(compat);
293 }
294
295 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
296   *   - blocking ASTs have already been sent
297   *   - the caller has already initialized req->lr_tmp
298   *   - must call this function with the ns lock held
299   *
300   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
301   *   - blocking ASTs have not been sent
302   *   - the caller has NOT initialized req->lr_tmp, so we must
303   *   - must call this function with the ns lock held once */
304 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
305                              ldlm_error_t *err, struct list_head *work_list)
306 {
307         struct ldlm_resource *res = lock->l_resource;
308         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
309         int rc, rc2;
310         ENTRY;
311
312         LASSERT(list_empty(&res->lr_converting));
313         *err = ELDLM_OK;
314
315         if (!first_enq) {
316                 /* Careful observers will note that we don't handle -EWOULDBLOCK
317                  * here, but it's ok for a non-obvious reason -- compat_queue
318                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
319                  * flags should always be zero here, and if that ever stops
320                  * being true, we want to find out. */
321                 LASSERT(*flags == 0);
322                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
323                                               err, NULL);
324                 if (rc == 1) {
325                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
326                                                       flags, err, NULL);
327                 }
328                 if (rc == 0)
329                         RETURN(LDLM_ITER_STOP);
330
331                 ldlm_resource_unlink_lock(lock);
332                 ldlm_extent_policy(res, lock, flags);
333                 ldlm_grant_lock(lock, work_list);
334                 RETURN(LDLM_ITER_CONTINUE);
335         }
336
337  restart:
338         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
339         if (rc < 0)
340                 GOTO(out, rc); /* lock was destroyed */
341         if (rc == 2) {
342                 goto grant;
343         }
344
345         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
346         if (rc2 < 0)
347                 GOTO(out, rc = rc2); /* lock was destroyed */
348
349         if (rc + rc2 == 2) {
350         grant:
351                 ldlm_extent_policy(res, lock, flags);
352                 ldlm_resource_unlink_lock(lock);
353                 ldlm_grant_lock(lock, NULL);
354         } else {
355                 /* If either of the compat_queue()s returned failure, then we
356                  * have ASTs to send and must go onto the waiting list.
357                  *
358                  * bug 2322: we used to unlink and re-add here, which was a
359                  * terrible folly -- if we goto restart, we could get
360                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
361                 if (list_empty(&lock->l_res_link))
362                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
363                 unlock_res(res);
364                 rc = ldlm_run_bl_ast_work(&rpc_list);
365                 lock_res(res);
366                 if (rc == -ERESTART)
367                         GOTO(restart, -ERESTART);
368                 *flags |= LDLM_FL_BLOCK_GRANTED;
369         }
370         rc = 0;
371 out:
372         RETURN(rc);
373 }
374
375 /* When a lock is cancelled by a client, the KMS may undergo change if this
376  * is the "highest lock".  This function returns the new KMS value.
377  * Caller must hold ns_lock already. 
378  *
379  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
380 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
381 {
382         struct ldlm_resource *res = lock->l_resource;
383         struct list_head *tmp;
384         struct ldlm_lock *lck;
385         __u64 kms = 0;
386         ENTRY;
387
388         /* don't let another thread in ldlm_extent_shift_kms race in
389          * just after we finish and take our lock into account in its
390          * calculation of the kms */
391
392         lock->l_flags |= LDLM_FL_KMS_IGNORE;
393
394         list_for_each(tmp, &res->lr_granted) {
395                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
396
397                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
398                         continue;
399
400                 if (lck->l_policy_data.l_extent.end >= old_kms)
401                         RETURN(old_kms);
402
403                 /* This extent _has_ to be smaller than old_kms (checked above)
404                  * so kms can only ever be smaller or the same as old_kms. */
405                 if (lck->l_policy_data.l_extent.end + 1 > kms)
406                         kms = lck->l_policy_data.l_extent.end + 1;
407         }
408         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
409
410         RETURN(kms);
411 }