Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LDLM
25 #ifndef __KERNEL__
26 # include <liblustre.h>
27 #endif
28
29 #include <linux/lustre_dlm.h>
30 #include <linux/obd_support.h>
31 #include <linux/lustre_lib.h>
32
33 #include "ldlm_internal.h"
34
35 /* The purpose of this function is to return:
36  * - the maximum extent
37  * - containing the requested extent
38  * - and not overlapping existing conflicting extents outside the requested one
39  */
40 static void
41 ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
42                             struct ldlm_extent *new_ex)
43 {
44         struct list_head *tmp;
45         ldlm_mode_t req_mode = req->l_req_mode;
46         __u64 req_start = req->l_req_extent.start;
47         __u64 req_end = req->l_req_extent.end;
48         int conflicting = 0;
49         ENTRY;
50
51         lockmode_verify(req_mode);
52
53         list_for_each(tmp, queue) {
54                 struct ldlm_lock *lock;
55                 struct ldlm_extent *l_extent;
56
57                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
58                 l_extent = &lock->l_policy_data.l_extent;
59
60                 if (new_ex->start == req_start && new_ex->end == req_end) {
61                         EXIT;
62                         return;
63                 }
64
65                 /* Don't conflict with ourselves */
66                 if (req == lock)
67                         continue;
68
69                 /* Locks are compatible, overlap doesn't matter */
70                 /* Until bug 20 is fixed, try to avoid granting overlapping
71                  * locks on one client (they take a long time to cancel) */
72                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
73                     lock->l_export != req->l_export)
74                         continue;
75
76                 /* If this is a high-traffic lock, don't grow downwards at all
77                  * or grow upwards too much */
78                 ++conflicting;
79                 if (conflicting > 4)
80                         new_ex->start = req_start;
81
82                 /* If lock doesn't overlap new_ex, skip it. */
83                 if (l_extent->end < new_ex->start ||
84                     l_extent->start > new_ex->end)
85                         continue;
86
87                 /* Locks conflicting in requested extents and we can't satisfy
88                  * both locks, so ignore it.  Either we will ping-pong this
89                  * extent (we would regardless of what extent we granted) or
90                  * lock is unused and it shouldn't limit our extent growth. */
91                 if (lock->l_req_extent.end >= req_start &&
92                     lock->l_req_extent.start <= req_end)
93                         continue;
94
95                 /* We grow extents downwards only as far as they don't overlap
96                  * with already-granted locks, on the assumtion that clients
97                  * will be writing beyond the initial requested end and would
98                  * then need to enqueue a new lock beyond previous request.
99                  * l_req_extent->end strictly < req_start, checked above. */
100                 if (l_extent->start < req_start && new_ex->start != req_start) {
101                         if (l_extent->end >= req_start)
102                                 new_ex->start = req_start;
103                         else
104                                 new_ex->start = min(l_extent->end+1, req_start);
105                 }
106
107                 /* If we need to cancel this lock anyways because our request
108                  * overlaps the granted lock, we grow up to its requested
109                  * extent start instead of limiting this extent, assuming that
110                  * clients are writing forwards and the lock had over grown
111                  * its extent downwards before we enqueued our request. */
112                 if (l_extent->end > req_end) {
113                         if (l_extent->start <= req_end)
114                                 new_ex->end = max(lock->l_req_extent.start - 1,
115                                                   req_end);
116                         else
117                                 new_ex->end = max(l_extent->start - 1, req_end);
118                 }
119         }
120
121 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
122         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
123                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
124                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
125                                           new_ex->end);
126         }
127         EXIT;
128 }
129
130 /* In order to determine the largest possible extent we can grant, we need
131  * to scan all of the queues. */
132 static void ldlm_extent_policy(struct ldlm_resource *res,
133                                struct ldlm_lock *lock, int *flags)
134 {
135         struct ldlm_extent new_ex = { .start = 0, .end = ~0};
136
137         ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
138         ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
139
140         if (new_ex.start != lock->l_policy_data.l_extent.start ||
141             new_ex.end != lock->l_policy_data.l_extent.end) {
142                 *flags |= LDLM_FL_LOCK_CHANGED;
143                 lock->l_policy_data.l_extent.start = new_ex.start;
144                 lock->l_policy_data.l_extent.end = new_ex.end;
145         }
146 }
147
148 /* Determine if the lock is compatible with all locks on the queue.
149  * We stop walking the queue if we hit ourselves so we don't take
150  * conflicting locks enqueued after us into accound, or we'd wait forever.
151  *
152  * 0 if the lock is not compatible
153  * 1 if the lock is compatible
154  * 2 if this group lock is compatible and requires no further checking
155  * negative error, such as EWOULDBLOCK for group locks
156  */
157 static int
158 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
159                          int send_cbs, int *flags, ldlm_error_t *err)
160 {
161         struct list_head *tmp;
162         struct ldlm_lock *lock;
163         ldlm_mode_t req_mode = req->l_req_mode;
164         __u64 req_start = req->l_req_extent.start;
165         __u64 req_end = req->l_req_extent.end;
166         int compat = 1;
167         int scan = 0;
168         ENTRY;
169
170         lockmode_verify(req_mode);
171
172         list_for_each(tmp, queue) {
173                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
174
175                 if (req == lock)
176                         RETURN(compat);
177
178                 if (scan) {
179                         /* We only get here if we are queuing GROUP lock
180                            and met some incompatible one. The main idea of this
181                            code is to insert GROUP lock past compatible GROUP
182                            lock in the waiting queue or if there is not any,
183                            then in front of first non-GROUP lock */
184                         if (lock->l_req_mode != LCK_GROUP) {
185                         /* Ok, we hit non-GROUP lock, there should be no
186                            more GROUP locks later on, queue in front of
187                            first non-GROUP lock */
188
189                                 ldlm_resource_insert_lock_after(lock, req);
190                                 list_del_init(&lock->l_res_link);
191                                 ldlm_resource_insert_lock_after(req, lock);
192                                 RETURN(0);
193                         }
194                         if (req->l_policy_data.l_extent.gid ==
195                              lock->l_policy_data.l_extent.gid) {
196                                 /* found it */
197                                 ldlm_resource_insert_lock_after(lock,
198                                                                 req);
199                                 RETURN(0);
200                         }
201                         continue;
202                 }
203
204                 /* locks are compatible, overlap doesn't matter */
205                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
206                         /* non-group locks are compatible, overlap doesn't
207                            matter */
208                         if (req_mode != LCK_GROUP)
209                                 continue;
210                                 
211                         /* If we are trying to get a GROUP lock and there is
212                            another one of this kind, we need to compare gid */
213                         if (req->l_policy_data.l_extent.gid ==
214                             lock->l_policy_data.l_extent.gid) {
215                                 if (lock->l_req_mode == lock->l_granted_mode)
216                                         RETURN(2);
217
218                                 /* If we are in nonblocking mode - return
219                                    immediately */
220                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
221                                         compat = -EWOULDBLOCK;
222                                         goto destroylock;
223                                 }
224                                 /* If this group lock is compatible with another
225                                  * group lock on the waiting list, they must be
226                                  * together in the list, so they can be granted
227                                  * at the same time.  Otherwise the later lock
228                                  * can get stuck behind another, incompatible,
229                                  * lock. */
230                                 ldlm_resource_insert_lock_after(lock, req);
231                                 /* Because 'lock' is not granted, we can stop
232                                  * processing this queue and return immediately.
233                                  * There is no need to check the rest of the
234                                  * list. */
235                                 RETURN(0);
236                         }
237                 }
238
239                 if (req_mode == LCK_GROUP &&
240                     (lock->l_req_mode != lock->l_granted_mode)) {
241                         scan = 1;
242                         compat = 0;
243                         if (lock->l_req_mode != LCK_GROUP) {
244                         /* Ok, we hit non-GROUP lock, there should be no
245                            more GROUP locks later on, queue in front of
246                            first non-GROUP lock */
247
248                                 ldlm_resource_insert_lock_after(lock, req);
249                                 list_del_init(&lock->l_res_link);
250                                 ldlm_resource_insert_lock_after(req, lock);
251                                 RETURN(0);
252                         }
253                         if (req->l_policy_data.l_extent.gid ==
254                              lock->l_policy_data.l_extent.gid) {
255                                 /* found it */
256                                 ldlm_resource_insert_lock_after(lock, req);
257                                 RETURN(0);
258                         }
259                         continue;
260                 }
261
262                 if (lock->l_req_mode == LCK_GROUP) {
263                         /* If compared lock is GROUP, then requested is PR/PW/=>
264                          * this is not compatible; extent range does not
265                          * matter */
266                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
267                                 compat = -EWOULDBLOCK;
268                                 goto destroylock;
269                         } else {
270                                 *flags |= LDLM_FL_NO_TIMEOUT;
271                         }
272                 } else if (lock->l_policy_data.l_extent.end < req_start ||
273                            lock->l_policy_data.l_extent.start > req_end) {
274                         /* if a non grouplock doesn't overlap skip it */
275                         continue;
276                 }
277
278                 if (!send_cbs)
279                         RETURN(0);
280
281                 compat = 0;
282                 if (lock->l_blocking_ast)
283                         ldlm_add_ast_work_item(lock, req, NULL, 0);
284         }
285
286         return(compat);
287 destroylock:
288         list_del_init(&req->l_res_link);
289         ldlm_lock_destroy(req);
290         *err = compat;
291         RETURN(compat);
292 }
293
294 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
295   *   - blocking ASTs have already been sent
296   *   - the caller has already initialized req->lr_tmp
297   *   - must call this function with the ns lock held
298   *
299   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
300   *   - blocking ASTs have not been sent
301   *   - the caller has NOT initialized req->lr_tmp, so we must
302   *   - must call this function with the ns lock held once */
303 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
304                              ldlm_error_t *err)
305 {
306         struct ldlm_resource *res = lock->l_resource;
307         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
308         int rc, rc2;
309         ENTRY;
310
311         LASSERT(list_empty(&res->lr_converting));
312         *err = ELDLM_OK;
313
314         if (!first_enq) {
315                 /* Careful observers will note that we don't handle -EWOULDBLOCK
316                  * here, but it's ok for a non-obvious reason -- compat_queue
317                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
318                  * flags should always be zero here, and if that ever stops
319                  * being true, we want to find out. */
320                 LASSERT(*flags == 0);
321                 LASSERT(res->lr_tmp != NULL);
322                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0, flags,
323                                               err);
324                 if (rc == 1) {
325                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0,
326                                                       flags, err);
327                 }
328                 if (rc == 0)
329                         RETURN(LDLM_ITER_STOP);
330
331                 ldlm_resource_unlink_lock(lock);
332
333                 ldlm_extent_policy(res, lock, flags);
334                 ldlm_grant_lock(lock, NULL, 0, 1);
335                 RETURN(LDLM_ITER_CONTINUE);
336         }
337
338  restart:
339         LASSERT(res->lr_tmp == NULL);
340         res->lr_tmp = &rpc_list;
341         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1, flags, err);
342         if (rc < 0)
343                 GOTO(out, rc); /* lock was destroyed */
344         if (rc == 2) {
345                 res->lr_tmp = NULL;
346                 goto grant;
347         }
348
349         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, 1, flags, err);
350         if (rc2 < 0)
351                 GOTO(out, rc = rc2); /* lock was destroyed */
352         res->lr_tmp = NULL;
353
354         if (rc + rc2 == 2) {
355         grant:
356                 ldlm_extent_policy(res, lock, flags);
357                 ldlm_resource_unlink_lock(lock);
358                 ldlm_grant_lock(lock, NULL, 0, 0);
359         } else {
360                 /* If either of the compat_queue()s returned failure, then we
361                  * have ASTs to send and must go onto the waiting list.
362                  *
363                  * bug 2322: we used to unlink and re-add here, which was a
364                  * terrible folly -- if we goto restart, we could get
365                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
366                 if (list_empty(&lock->l_res_link))
367                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
368                 l_unlock(&res->lr_namespace->ns_lock);
369                 rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
370                 l_lock(&res->lr_namespace->ns_lock);
371                 if (rc == -ERESTART)
372                         GOTO(restart, -ERESTART);
373                 *flags |= LDLM_FL_BLOCK_GRANTED;
374         }
375         rc = 0;
376 out:
377         res->lr_tmp = NULL;
378         RETURN(rc);
379 }
380
381 /* When a lock is cancelled by a client, the KMS may undergo change if this
382  * is the "highest lock".  This function returns the new KMS value.
383  *
384  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
385 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
386 {
387         struct ldlm_resource *res = lock->l_resource;
388         struct list_head *tmp;
389         struct ldlm_lock *lck;
390         __u64 kms = 0;
391         ENTRY;
392
393         l_lock(&res->lr_namespace->ns_lock);
394
395         /* don't let another thread in ldlm_extent_shift_kms race in just after
396          * we finish and take our lock into account in its calculation of the
397          * kms */
398         lock->l_flags |= LDLM_FL_KMS_IGNORE;
399
400         list_for_each(tmp, &res->lr_granted) {
401                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
402
403                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
404                         continue;
405
406                 if (lck->l_policy_data.l_extent.end >= old_kms)
407                         GOTO(out, kms = old_kms);
408
409                 /* This extent _has_ to be smaller than old_kms (checked above)
410                  * so kms can only ever be smaller or the same as old_kms. */
411                 if (lck->l_policy_data.l_extent.end + 1 > kms)
412                         kms = lck->l_policy_data.l_extent.end + 1;
413         }
414         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
415
416         GOTO(out, kms);
417  out:
418         l_unlock(&res->lr_namespace->ns_lock);
419         return kms;
420 }