Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Hewlett-Packard Development Company LP.
5  *   Developed under the sponsorship of the US Government under
6  *   Subcontract No. B514193
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28
29 #ifdef __KERNEL__
30 #include <lustre_dlm.h>
31 #include <obd_support.h>
32 #include <obd_class.h>
33 #include <lustre_lib.h>
34 #include <libcfs/list.h>
35 #else
36 #include <liblustre.h>
37 #include <obd_class.h>
38 #endif
39
40 #include "ldlm_internal.h"
41
42 #define l_flock_waitq   l_lru
43
44 static CFS_LIST_HEAD(ldlm_flock_waitq);
45 spinlock_t ldlm_flock_waitq_lock = SPIN_LOCK_UNLOCKED;
46
47 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
48                             void *data, int flag);
49
50 /**
51  * list_for_remaining_safe - iterate over the remaining entries in a list
52  *              and safeguard against removal of a list entry.
53  * @pos:        the &struct list_head to use as a loop counter. pos MUST
54  *              have been initialized prior to using it in this macro.
55  * @n:          another &struct list_head to use as temporary storage
56  * @head:       the head for your list.
57  */
58 #define list_for_remaining_safe(pos, n, head) \
59         for (n = pos->next; pos != (head); pos = n, n = pos->next)
60
61 static inline int
62 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
63 {
64         return((new->l_policy_data.l_flock.pid ==
65                 lock->l_policy_data.l_flock.pid) &&
66                (new->l_export == lock->l_export));
67 }
68
69 static inline int
70 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
71 {
72         return((new->l_policy_data.l_flock.start <=
73                 lock->l_policy_data.l_flock.end) &&
74                (new->l_policy_data.l_flock.end >=
75                 lock->l_policy_data.l_flock.start));
76 }
77
78 static inline void
79 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
80 {
81         ENTRY;
82
83         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
84                    mode, flags);
85
86         /* Safe to not lock here, since it should be empty anyway */
87         LASSERT(list_empty(&lock->l_flock_waitq));
88
89         list_del_init(&lock->l_res_link);
90         if (flags == LDLM_FL_WAIT_NOREPROC) {
91                 /* client side - set a flag to prevent sending a CANCEL */
92                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
93                 ldlm_lock_decref_internal(lock, mode);
94         }
95
96         ldlm_lock_destroy_nolock(lock);
97         EXIT;
98 }
99
100 static int
101 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
102 {
103         struct obd_export *req_export = req->l_export;
104         struct obd_export *blocking_export = blocking_lock->l_export;
105         pid_t req_pid = req->l_policy_data.l_flock.pid;
106         pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
107         struct ldlm_lock *lock;
108
109         spin_lock(&ldlm_flock_waitq_lock);
110 restart:
111         list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
112                 if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
113                     (lock->l_export != blocking_export))
114                         continue;
115
116                 blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
117                 blocking_export = (struct obd_export *)(long)
118                         lock->l_policy_data.l_flock.blocking_export;
119                 if (blocking_pid == req_pid && blocking_export == req_export) {
120                         spin_unlock(&ldlm_flock_waitq_lock);
121                         return 1;
122                 }
123
124                 goto restart;
125         }
126         spin_unlock(&ldlm_flock_waitq_lock);
127
128         return 0;
129 }
130
131 int
132 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
133                         ldlm_error_t *err, struct list_head *work_list)
134 {
135         struct ldlm_resource *res = req->l_resource;
136         struct ldlm_namespace *ns = res->lr_namespace;
137         struct list_head *tmp;
138         struct list_head *ownlocks = NULL;
139         struct ldlm_lock *lock = NULL;
140         struct ldlm_lock *new = req;
141         struct ldlm_lock *new2 = NULL;
142         ldlm_mode_t mode = req->l_req_mode;
143         int local = ns_is_client(ns);
144         int added = (mode == LCK_NL);
145         int overlaps = 0;
146         ENTRY;
147
148         CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
149                "\n", *flags, new->l_policy_data.l_flock.pid, mode,
150                req->l_policy_data.l_flock.start,
151                req->l_policy_data.l_flock.end);
152
153         *err = ELDLM_OK;
154
155         if (local) {
156                 /* No blocking ASTs are sent to the clients for
157                  * Posix file & record locks */
158                 req->l_blocking_ast = NULL;
159         } else {
160                 /* Called on the server for lock cancels. */
161                 req->l_blocking_ast = ldlm_flock_blocking_ast;
162         }
163
164         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
165                 /* This loop determines where this processes locks start
166                  * in the resource lr_granted list. */
167                 list_for_each(tmp, &res->lr_granted) {
168                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
169                         if (ldlm_same_flock_owner(lock, req)) {
170                                 ownlocks = tmp;
171                                 break;
172                         }
173                 }
174         } else {
175                 lockmode_verify(mode);
176
177                 /* This loop determines if there are existing locks
178                  * that conflict with the new lock request. */
179                 list_for_each(tmp, &res->lr_granted) {
180                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
181
182                         if (ldlm_same_flock_owner(lock, req)) {
183                                 if (!ownlocks)
184                                         ownlocks = tmp;
185                                 continue;
186                         }
187
188                         /* locks are compatible, overlap doesn't matter */
189                         if (lockmode_compat(lock->l_granted_mode, mode))
190                                 continue;
191
192                         if (!ldlm_flocks_overlap(lock, req))
193                                 continue;
194
195                         if (!first_enq)
196                                 RETURN(LDLM_ITER_CONTINUE);
197
198                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
199                                 ldlm_flock_destroy(req, mode, *flags);
200                                 *err = -EAGAIN;
201                                 RETURN(LDLM_ITER_STOP);
202                         }
203
204                         if (*flags & LDLM_FL_TEST_LOCK) {
205                                 ldlm_flock_destroy(req, mode, *flags);
206                                 req->l_req_mode = lock->l_granted_mode;
207                                 req->l_policy_data.l_flock.pid =
208                                         lock->l_policy_data.l_flock.pid;
209                                 req->l_policy_data.l_flock.start =
210                                         lock->l_policy_data.l_flock.start;
211                                 req->l_policy_data.l_flock.end =
212                                         lock->l_policy_data.l_flock.end;
213                                 *flags |= LDLM_FL_LOCK_CHANGED;
214                                 RETURN(LDLM_ITER_STOP);
215                         }
216
217                         if (ldlm_flock_deadlock(req, lock)) {
218                                 ldlm_flock_destroy(req, mode, *flags);
219                                 *err = -EDEADLK;
220                                 RETURN(LDLM_ITER_STOP);
221                         }
222
223                         req->l_policy_data.l_flock.blocking_pid =
224                                 lock->l_policy_data.l_flock.pid;
225                         req->l_policy_data.l_flock.blocking_export =
226                                 (long)(void *)lock->l_export;
227
228                         LASSERT(list_empty(&req->l_flock_waitq));
229                         spin_lock(&ldlm_flock_waitq_lock);
230                         list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
231                         spin_unlock(&ldlm_flock_waitq_lock);
232
233                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
234                         *flags |= LDLM_FL_BLOCK_GRANTED;
235                         RETURN(LDLM_ITER_STOP);
236                 }
237         }
238
239         if (*flags & LDLM_FL_TEST_LOCK) {
240                 ldlm_flock_destroy(req, mode, *flags);
241                 req->l_req_mode = LCK_NL;
242                 *flags |= LDLM_FL_LOCK_CHANGED;
243                 RETURN(LDLM_ITER_STOP);
244         }
245
246         /* In case we had slept on this lock request take it off of the
247          * deadlock detection waitq. */
248         spin_lock(&ldlm_flock_waitq_lock);
249         list_del_init(&req->l_flock_waitq);
250         spin_unlock(&ldlm_flock_waitq_lock);
251
252         /* Scan the locks owned by this process that overlap this request.
253          * We may have to merge or split existing locks. */
254
255         if (!ownlocks)
256                 ownlocks = &res->lr_granted;
257
258         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
259                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
260
261                 if (!ldlm_same_flock_owner(lock, new))
262                         break;
263
264                 if (lock->l_granted_mode == mode) {
265                         /* If the modes are the same then we need to process
266                          * locks that overlap OR adjoin the new lock. The extra
267                          * logic condition is necessary to deal with arithmetic
268                          * overflow and underflow. */
269                         if ((new->l_policy_data.l_flock.start >
270                              (lock->l_policy_data.l_flock.end + 1))
271                             && (lock->l_policy_data.l_flock.end !=
272                                 OBD_OBJECT_EOF))
273                                 continue;
274
275                         if ((new->l_policy_data.l_flock.end <
276                              (lock->l_policy_data.l_flock.start - 1))
277                             && (lock->l_policy_data.l_flock.start != 0))
278                                 break;
279
280                         if (new->l_policy_data.l_flock.start <
281                             lock->l_policy_data.l_flock.start) {
282                                 lock->l_policy_data.l_flock.start =
283                                         new->l_policy_data.l_flock.start;
284                         } else {
285                                 new->l_policy_data.l_flock.start =
286                                         lock->l_policy_data.l_flock.start;
287                         }
288
289                         if (new->l_policy_data.l_flock.end >
290                             lock->l_policy_data.l_flock.end) {
291                                 lock->l_policy_data.l_flock.end =
292                                         new->l_policy_data.l_flock.end;
293                         } else {
294                                 new->l_policy_data.l_flock.end =
295                                         lock->l_policy_data.l_flock.end;
296                         }
297
298                         if (added) {
299                                 ldlm_flock_destroy(lock, mode, *flags);
300                         } else {
301                                 new = lock;
302                                 added = 1;
303                         }
304                         continue;
305                 }
306
307                 if (new->l_policy_data.l_flock.start >
308                     lock->l_policy_data.l_flock.end)
309                         continue;
310
311                 if (new->l_policy_data.l_flock.end <
312                     lock->l_policy_data.l_flock.start)
313                         break;
314
315                 ++overlaps;
316
317                 if (new->l_policy_data.l_flock.start <=
318                     lock->l_policy_data.l_flock.start) {
319                         if (new->l_policy_data.l_flock.end <
320                             lock->l_policy_data.l_flock.end) {
321                                 lock->l_policy_data.l_flock.start =
322                                         new->l_policy_data.l_flock.end + 1;
323                                 break;
324                         }
325                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
326                         continue;
327                 }
328                 if (new->l_policy_data.l_flock.end >=
329                     lock->l_policy_data.l_flock.end) {
330                         lock->l_policy_data.l_flock.end =
331                                 new->l_policy_data.l_flock.start - 1;
332                         continue;
333                 }
334
335                 /* split the existing lock into two locks */
336
337                 /* if this is an F_UNLCK operation then we could avoid
338                  * allocating a new lock and use the req lock passed in
339                  * with the request but this would complicate the reply
340                  * processing since updates to req get reflected in the
341                  * reply. The client side replays the lock request so
342                  * it must see the original lock data in the reply. */
343
344                 /* XXX - if ldlm_lock_new() can sleep we should
345                  * release the ns_lock, allocate the new lock,
346                  * and restart processing this lock. */
347                 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
348                                         lock->l_granted_mode, NULL, NULL, NULL,
349                                         NULL, 0);
350                 if (!new2) {
351                         ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
352                         *err = -ENOLCK;
353                         RETURN(LDLM_ITER_STOP);
354                 }
355
356                 new2->l_granted_mode = lock->l_granted_mode;
357                 new2->l_policy_data.l_flock.pid =
358                         new->l_policy_data.l_flock.pid;
359                 new2->l_policy_data.l_flock.start =
360                         lock->l_policy_data.l_flock.start;
361                 new2->l_policy_data.l_flock.end =
362                         new->l_policy_data.l_flock.start - 1;
363                 lock->l_policy_data.l_flock.start =
364                         new->l_policy_data.l_flock.end + 1;
365                 new2->l_conn_export = lock->l_conn_export;
366                 if (lock->l_export != NULL) {
367                         new2->l_export = class_export_get(lock->l_export);
368                         spin_lock(&new2->l_export->exp_ldlm_data.led_lock);
369                         list_add(&new2->l_export_chain,
370                                  &new2->l_export->exp_ldlm_data.led_held_locks);
371                         spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);
372                 }
373                 if (*flags == LDLM_FL_WAIT_NOREPROC)
374                         ldlm_lock_addref_internal(new2, lock->l_granted_mode);
375
376                 /* insert new2 at lock */
377                 ldlm_resource_add_lock(res, ownlocks, new2);
378                 LDLM_LOCK_PUT(new2);
379                 break;
380         }
381
382         /* At this point we're granting the lock request. */
383         req->l_granted_mode = req->l_req_mode;
384
385         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
386         if (!added) {
387                 list_del_init(&req->l_res_link);
388                 /* insert new lock before ownlocks in list. */
389                 ldlm_resource_add_lock(res, ownlocks, req);
390         }
391
392         if (*flags != LDLM_FL_WAIT_NOREPROC) {
393                 if (first_enq) {
394                         /* If this is an unlock, reprocess the waitq and
395                          * send completions ASTs for locks that can now be
396                          * granted. The only problem with doing this
397                          * reprocessing here is that the completion ASTs for
398                          * newly granted locks will be sent before the unlock
399                          * completion is sent. It shouldn't be an issue. Also
400                          * note that ldlm_process_flock_lock() will recurse,
401                          * but only once because first_enq will be false from
402                          * ldlm_reprocess_queue. */
403                         if ((mode == LCK_NL) && overlaps) {
404                                 CFS_LIST_HEAD(rpc_list);
405                                 int rc;
406 restart:
407                                 ldlm_reprocess_queue(res, &res->lr_waiting,
408                                                      &rpc_list);
409
410                                 unlock_res(res);
411                                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
412                                 lock_res(res);
413                                 if (rc == -ERESTART)
414                                         GOTO(restart, -ERESTART);
415                        }
416                 } else {
417                         LASSERT(req->l_completion_ast);
418                         ldlm_add_ast_work_item(req, NULL, work_list);
419                 }
420         }
421
422         /* In case we're reprocessing the requested lock we can't destroy
423          * it until after calling ldlm_ast_work_item() above so that lawi()
424          * can bump the reference count on req. Otherwise req could be freed
425          * before the completion AST can be sent.  */
426         if (added)
427                 ldlm_flock_destroy(req, mode, *flags);
428
429         ldlm_resource_dump(D_OTHER, res);
430         RETURN(LDLM_ITER_CONTINUE);
431 }
432
433 struct ldlm_flock_wait_data {
434         struct ldlm_lock *fwd_lock;
435         int               fwd_generation;
436 };
437
438 static void
439 ldlm_flock_interrupted_wait(void *data)
440 {
441         struct ldlm_lock *lock;
442         struct lustre_handle lockh;
443         int rc;
444         ENTRY;
445
446         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
447
448         /* take lock off the deadlock detection waitq. */
449         spin_lock(&ldlm_flock_waitq_lock);
450         list_del_init(&lock->l_flock_waitq);
451         spin_unlock(&ldlm_flock_waitq_lock);
452
453         /* client side - set flag to prevent lock from being put on lru list */
454         lock->l_flags |= LDLM_FL_CBPENDING;
455
456         ldlm_lock_decref_internal(lock, lock->l_req_mode);
457         ldlm_lock2handle(lock, &lockh);
458         rc = ldlm_cli_cancel(&lockh);
459         if (rc != ELDLM_OK)
460                 CERROR("ldlm_cli_cancel: %d\n", rc);
461
462         EXIT;
463 }
464
465 int
466 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
467 {
468         struct ldlm_namespace *ns;
469         cfs_flock_t *getlk = lock->l_ast_data;
470         struct ldlm_flock_wait_data fwd;
471         struct obd_device *obd;
472         struct obd_import *imp = NULL;
473         ldlm_error_t err;
474         int rc = 0;
475         struct l_wait_info lwi;
476         ENTRY;
477
478         CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
479                flags, data, getlk);
480
481         /* Import invalidation. We need to actually release the lock
482          * references being held, so that it can go away. No point in
483          * holding the lock even if app still believes it has it, since
484          * server already dropped it anyway. Only for granted locks too. */
485         lock_res_and_lock(lock);
486         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) == 
487             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
488                 unlock_res_and_lock(lock);
489                 if (lock->l_req_mode == lock->l_granted_mode &&
490                     lock->l_granted_mode != LCK_NL)
491                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
492                 RETURN(0);
493         }
494         unlock_res_and_lock(lock);
495
496         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
497
498         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
499                        LDLM_FL_BLOCK_CONV)))
500                 goto  granted;
501
502         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
503                    "sleeping");
504
505         fwd.fwd_lock = lock;
506         obd = class_exp2obd(lock->l_conn_export);
507
508         /* if this is a local lock, then there is no import */
509         if (obd != NULL)
510                 imp = obd->u.cli.cl_import;
511
512         if (imp != NULL) {
513                 spin_lock(&imp->imp_lock);
514                 fwd.fwd_generation = imp->imp_generation;
515                 spin_unlock(&imp->imp_lock);
516         }
517
518         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
519
520         /* Go to sleep until the lock is granted. */
521         rc = l_wait_event(lock->l_waitq,
522                           ((lock->l_req_mode == lock->l_granted_mode) ||
523                            lock->l_destroyed), &lwi);
524
525         LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);
526         RETURN(rc);
527
528 granted:
529         /* before flock's complete ast gets here, the flock
530          * can possibly be freed by another thread
531          */
532         if (lock->l_destroyed) {
533                 LDLM_DEBUG(lock, "already destroyed by another thread");
534                 RETURN(0);
535         }
536
537         LDLM_DEBUG(lock, "client-side enqueue granted");
538         ns = lock->l_resource->lr_namespace;
539         lock_res(lock->l_resource);
540
541         /* take lock off the deadlock detection waitq. */
542         spin_lock(&ldlm_flock_waitq_lock);
543         list_del_init(&lock->l_flock_waitq);
544         spin_unlock(&ldlm_flock_waitq_lock);
545
546         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
547         list_del_init(&lock->l_res_link);
548
549         if (flags & LDLM_FL_TEST_LOCK) {
550                 /* fcntl(F_GETLK) request */
551                 /* The old mode was saved in getlk->fl_type so that if the mode
552                  * in the lock changes we can decref the approprate refcount. */
553                 ldlm_flock_destroy(lock, cfs_flock_type(getlk), LDLM_FL_WAIT_NOREPROC);
554                 switch (lock->l_granted_mode) {
555                 case LCK_PR:
556                         cfs_flock_set_type(getlk, F_RDLCK);
557                         break;
558                 case LCK_PW:
559                         cfs_flock_set_type(getlk, F_WRLCK);
560                         break;
561                 default:
562                         cfs_flock_set_type(getlk, F_UNLCK);
563                 }
564                 cfs_flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
565                 cfs_flock_set_start(getlk, (loff_t)lock->l_policy_data.l_flock.start);
566                 cfs_flock_set_end(getlk, (loff_t)lock->l_policy_data.l_flock.end);
567         } else {
568                 int noreproc = LDLM_FL_WAIT_NOREPROC;
569
570                 /* We need to reprocess the lock to do merges or splits
571                  * with existing locks owned by this process. */
572                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
573                 if (flags == 0)
574                         cfs_waitq_signal(&lock->l_waitq);
575         }
576         unlock_res(lock->l_resource);
577         RETURN(0);
578 }
579 EXPORT_SYMBOL(ldlm_flock_completion_ast);
580
581 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
582                             void *data, int flag)
583 {
584         struct ldlm_namespace *ns;
585         ENTRY;
586
587         LASSERT(lock);
588         LASSERT(flag == LDLM_CB_CANCELING);
589
590         ns = lock->l_resource->lr_namespace;
591
592         /* take lock off the deadlock detection waitq. */
593         spin_lock(&ldlm_flock_waitq_lock);
594         list_del_init(&lock->l_flock_waitq);
595         spin_unlock(&ldlm_flock_waitq_lock);
596         RETURN(0);
597 }