Whamcloud - gitweb
Posix record locking changes.
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LDLM
25
26 #ifdef __KERNEL__
27 #include <linux/lustre_dlm.h>
28 #include <linux/obd_support.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_lib.h>
31 #else
32 #include <liblustre.h>
33 #endif
34
35 static inline int
36 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
37 {
38         if ((new->l_data.l_flock.pid == lock->l_data.l_flock.pid) &&
39             (new->l_export == lock->l_export))
40                 return 1;
41         else
42                 return 0;
43 }
44
45 static inline int
46 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
47 {
48         if ((new->l_data.l_flock.start <= lock->l_data.l_flock.end) &&
49             (new->l_data.l_flock.end >= lock->l_data.l_flock.start))
50                 return 1;
51         else
52                 return 0;
53 }
54
55 static inline void
56 ldlm_flock_destroy(struct ldlm_lock *lock, int flags)
57 {
58         ENTRY;
59
60         list_del_init(&lock->l_res_link);
61         if (flags == LDLM_FL_WAIT_NOREPROC) {
62                 /* client side */
63                 struct lustre_handle lockh;
64
65                 /* Set a flag to prevent us from sending a CANCEL */
66                 lock->l_flags |= LDLM_FL_LOCAL_ONLY;
67
68                 ldlm_lock2handle(lock, &lockh);
69                 ldlm_lock_decref_and_cancel(&lockh, lock->l_granted_mode);
70         }
71
72         ldlm_lock_destroy(lock);
73         EXIT;
74 }
75
76 int
77 ldlm_flock_enqueue(struct ldlm_lock **reqp, void *req_cookie, int *flags,
78                    int first_enq, ldlm_error_t *err)
79 {
80         struct ldlm_lock *req = *reqp;
81         struct ldlm_lock *new = req;
82         struct ldlm_lock *new2 = NULL;
83         struct ldlm_lock *lock = NULL;
84         struct ldlm_resource *res = req->l_resource;
85         struct ldlm_namespace *ns = res->lr_namespace;
86         struct list_head *tmp;
87         struct list_head *ownlocks;
88         ldlm_mode_t mode = req->l_req_mode;
89         int added = 0;
90         int overlaps = 0;
91         ENTRY;
92
93         CDEBUG(D_FLOCK, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu\n",
94                *flags, new->l_data.l_flock.pid, mode,
95                req->l_data.l_flock.start, req->l_data.l_flock.end);
96
97         *err = ELDLM_OK;
98
99         /* No blocking ASTs are sent for record locks */
100         req->l_blocking_ast = NULL;
101
102         ownlocks = NULL;
103         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
104                 list_for_each(tmp, &res->lr_granted) {
105                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
106
107                         if (ldlm_same_flock_owner(lock, req)) {
108                                 ownlocks = tmp;
109                                 break;
110                         }
111                 }
112         } else {
113                 list_for_each(tmp, &res->lr_granted) {
114                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
115
116                         if (ldlm_same_flock_owner(lock, req)) {
117                                 if (!ownlocks)
118                                         ownlocks = tmp;
119                                 continue;
120                         }
121
122                         /* locks are compatible, overlap doesn't matter */
123                         if (lockmode_compat(lock->l_granted_mode, mode))
124                                 continue;
125                         
126                         if (!ldlm_flocks_overlap(lock, req))
127                                 continue;
128
129                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
130                                 ldlm_flock_destroy(req, *flags);
131                                 *err = ELDLM_LOCK_ABORTED;
132                                 RETURN(LDLM_ITER_STOP);
133                         }
134
135                         if (*flags & LDLM_FL_TEST_LOCK) {
136                                 req->l_granted_mode = lock->l_granted_mode;
137                                 req->l_data.l_flock.pid =
138                                         lock->l_data.l_flock.pid;
139                                 req->l_data.l_flock.start =
140                                         lock->l_data.l_flock.start;
141                                 req->l_data.l_flock.end =
142                                         lock->l_data.l_flock.end;
143                                 ldlm_flock_destroy(req, *flags);
144                                 RETURN(LDLM_ITER_STOP);
145                         }
146
147                         if (first_enq) {
148                                 /* XXX - add deadlock detection check here */
149                         }
150
151                         *flags |= LDLM_FL_BLOCK_GRANTED;
152                         RETURN(LDLM_ITER_CONTINUE);
153                 }
154         }
155
156         if (*flags & LDLM_FL_TEST_LOCK) {
157                 LASSERT(first_enq);
158                 req->l_granted_mode = req->l_req_mode;
159                 RETURN(LDLM_ITER_STOP);
160         }
161
162         added = (mode == LCK_NL);
163
164         /* Insert the new lock into the list */
165
166         if (!ownlocks)
167                 ownlocks = &res->lr_granted;
168
169         for (tmp = ownlocks->next; ownlocks != &res->lr_granted;
170              ownlocks = tmp, tmp = ownlocks->next) {
171                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
172
173                 if (!ldlm_same_flock_owner(lock, new))
174                         break;
175
176                 if (lock->l_granted_mode == mode) {
177                         if (lock->l_data.l_flock.end <
178                             (new->l_data.l_flock.start - 1))
179                                 continue;
180
181                         if (lock->l_data.l_flock.start >
182                             (new->l_data.l_flock.end + 1))
183                                 break;
184
185                         if (lock->l_data.l_flock.start >
186                             new->l_data.l_flock.start)
187                                 lock->l_data.l_flock.start =
188                                         new->l_data.l_flock.start;
189                         else
190                                 new->l_data.l_flock.start =
191                                         lock->l_data.l_flock.start;
192
193                         if (lock->l_data.l_flock.end <
194                             new->l_data.l_flock.end)
195                                 lock->l_data.l_flock.end =
196                                         new->l_data.l_flock.end;
197                         else
198                                 new->l_data.l_flock.end =
199                                         lock->l_data.l_flock.end;
200
201                         if (added) {
202                                 ldlm_flock_destroy(lock, *flags);
203                         } else {
204                                 new = lock;
205                                 added = 1;
206                         }
207                         continue;
208                 }
209
210                 if (lock->l_data.l_flock.end < new->l_data.l_flock.start)
211                         continue;
212                 if (lock->l_data.l_flock.start > new->l_data.l_flock.end)
213                         break;
214
215                 ++overlaps;
216
217                 if (new->l_data.l_flock.start <=
218                     lock->l_data.l_flock.start) {
219                         if (new->l_data.l_flock.end <
220                             lock->l_data.l_flock.end) {
221                                 lock->l_data.l_flock.start =
222                                         new->l_data.l_flock.end + 1;
223                                 break;
224                         } else if (added) {
225                                 ldlm_flock_destroy(lock, *flags);
226                         } else {
227                                 lock->l_data.l_flock.start =
228                                         new->l_data.l_flock.start;
229                                 lock->l_data.l_flock.end =
230                                         new->l_data.l_flock.end;
231                                 new = lock;
232                                 added = 1;
233                         }
234                         continue;
235                 }
236                 if (new->l_data.l_flock.end >= lock->l_data.l_flock.end) {
237                         lock->l_data.l_flock.end =
238                                 new->l_data.l_flock.start - 1;
239                         continue;
240                 }
241
242                 /* split the existing lock into two locks */
243
244                 /* if this is an F_UNLCK operation then we could avoid
245                  * allocating a new lock and use the req lock passed in
246                  * with the request but this would complicate the reply
247                  * processing since updates to req get reflected in the
248                  * reply. The client side must see the original lock data
249                  * so that it can process the unlock properly. */
250
251                 /* XXX - if ldlm_lock_new() can sleep we have to
252                  * release the ns_lock, allocate the new lock, and
253                  * restart processing this lock. */
254                 new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
255                                         lock->l_granted_mode, NULL, NULL);
256                 if (!new2) {
257                         /* LBUG for now */
258                         LASSERT(0);
259                         RETURN(ENOMEM);
260                 }
261
262                 new2->l_granted_mode = lock->l_granted_mode;
263                 new2->l_data.l_flock.pid = new->l_data.l_flock.pid;
264                 new2->l_data.l_flock.start = lock->l_data.l_flock.start;
265                 new2->l_data.l_flock.end = new->l_data.l_flock.start - 1;
266                 lock->l_data.l_flock.start = new->l_data.l_flock.end + 1;
267                 new2->l_connh = lock->l_connh;
268                 if ((new2->l_export = lock->l_export) != NULL) {
269                         list_add(&new2->l_export_chain,
270                                  &new2->l_export->
271                                  exp_ldlm_data.led_held_locks);
272                 }
273                 if (*flags == LDLM_FL_WAIT_NOREPROC) {
274                         /* client side */
275                         ldlm_lock_addref_internal(new2, lock->l_granted_mode);
276                 }
277
278                 /* insert new2 at lock */
279                 list_add_tail(&new2->l_res_link, ownlocks);
280                 LDLM_LOCK_PUT(new2);
281                 break;
282         }
283
284         if (added) {
285                 ldlm_flock_destroy(req, *flags);
286         } else {
287                 /* insert new at ownlocks */
288                 new->l_granted_mode = new->l_req_mode;
289                 list_del_init(&new->l_res_link);
290                 list_add_tail(&new->l_res_link, ownlocks);
291         }
292
293         if (*flags != LDLM_FL_WAIT_NOREPROC) {
294                 if (req->l_completion_ast)
295                         ldlm_add_ast_work_item(req, NULL, NULL, 0);
296
297                 /* The only problem with doing the reprocessing here is that
298                  * the completion ASTs for newly granted locks will be sent
299                  * before the unlock completion is sent. It shouldn't be an
300                  * issue. Also note that ldlm_flock_enqueue() will recurse,
301                  * but only once because there can't be unlock requests on
302                  * the wait queue. */
303                 if ((mode == LCK_NL) && overlaps)
304                         ldlm_reprocess_queue(res, &res->lr_waiting);
305         }
306
307         ldlm_resource_dump(res);
308
309         RETURN(LDLM_ITER_CONTINUE);
310 }
311
312 static void interrupted_flock_completion_wait(void *data)
313 {
314 }
315
316 struct flock_wait_data {
317         struct ldlm_lock *fwd_lock;
318         int               fwd_generation;
319 };
320
321 int
322 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
323 {
324         struct ldlm_namespace *ns;
325         struct file_lock *getlk = data;
326         struct flock_wait_data fwd;
327         unsigned long irqflags;
328         struct obd_device *obd;
329         struct obd_import *imp = NULL;
330         ldlm_error_t err;
331         int rc = 0;
332         struct l_wait_info lwi;
333         ENTRY;
334
335         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
336
337         if (flags == 0) {
338                 wake_up(&lock->l_waitq);
339                 RETURN(0);
340         }
341
342         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
343                        LDLM_FL_BLOCK_CONV)))
344                 goto  granted;
345
346         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
347                    "sleeping");
348
349         ldlm_lock_dump(D_OTHER, lock);
350
351         fwd.fwd_lock = lock;
352         obd = class_conn2obd(lock->l_connh);
353
354         /* if this is a local lock, then there is no import */
355         if (obd != NULL)
356                 imp = obd->u.cli.cl_import;
357
358         if (imp != NULL) {
359                 spin_lock_irqsave(&imp->imp_lock, irqflags);
360                 fwd.fwd_generation = imp->imp_generation;
361                 spin_unlock_irqrestore(&imp->imp_lock, irqflags);
362         }
363
364         lwi = LWI_TIMEOUT_INTR(0, NULL, interrupted_flock_completion_wait,
365                                &fwd);
366
367         /* Go to sleep until the lock is granted. */
368         rc = l_wait_event(lock->l_waitq,
369                           ((lock->l_req_mode == lock->l_granted_mode) ||
370                            lock->l_destroyed), &lwi);
371
372         LASSERT(!(lock->l_destroyed));
373
374         if (rc) {
375                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
376                            rc);
377                 RETURN(rc);
378         }
379
380 granted:
381
382         LDLM_DEBUG(lock, "client-side enqueue waking up");
383         ns = lock->l_resource->lr_namespace;
384         l_lock(&ns->ns_lock);
385
386         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
387         list_del_init(&lock->l_res_link);
388
389         if (getlk) {
390                 /* fcntl(F_GETLK) request */
391                 if (lock->l_granted_mode == LCK_PR)
392                         getlk->fl_type = F_RDLCK;
393                 else if (lock->l_granted_mode == LCK_PW)
394                         getlk->fl_type = F_WRLCK;
395                 else
396                         getlk->fl_type = F_UNLCK;
397                 getlk->fl_pid = lock->l_data.l_flock.pid;
398                 getlk->fl_start = lock->l_data.l_flock.start;
399                 getlk->fl_end = lock->l_data.l_flock.end;
400                 /* ldlm_flock_destroy(lock); */
401         } else {
402                 flags = LDLM_FL_WAIT_NOREPROC;
403                 /* We need to reprocess the lock to do merges or split */
404                 ldlm_flock_enqueue(&lock, NULL, &flags, 1, &err);
405         }
406         l_unlock(&ns->ns_lock);
407         RETURN(0);
408 }
409
410 /* This function is only called on the client when a lock is aborted. */
411 int
412 ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld,
413                         void *data, int flag)
414 {
415         ENTRY;
416         ldlm_lock_destroy(lock);
417         RETURN(0);
418 }