Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Hewlett-Packard Development Company LP.
5  *   Developed under the sponsorship of the US Government under
6  *   Subcontract No. B514193
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28
29 #ifdef __KERNEL__
30 #include <lustre_dlm.h>
31 #include <obd_support.h>
32 #include <obd_class.h>
33 #include <lustre_lib.h>
34 #include <libcfs/list.h>
35 #else
36 #include <liblustre.h>
37 #include <obd_class.h>
38 #endif
39
40 #include "ldlm_internal.h"
41
42 #define l_flock_waitq   l_lru
43
44 /**
45  * Wait queue for Posix lock deadlock detection, added with
46  * ldlm_lock::l_flock_waitq.
47  */
48 static CFS_LIST_HEAD(ldlm_flock_waitq);
49 /**
50  * Lock protecting access to ldlm_flock_waitq.
51  */
52 spinlock_t ldlm_flock_waitq_lock = SPIN_LOCK_UNLOCKED;
53
54 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
55                             void *data, int flag);
56
57 /**
58  * list_for_remaining_safe - iterate over the remaining entries in a list
59  *              and safeguard against removal of a list entry.
60  * @pos:        the &struct list_head to use as a loop counter. pos MUST
61  *              have been initialized prior to using it in this macro.
62  * @n:          another &struct list_head to use as temporary storage
63  * @head:       the head for your list.
64  */
65 #define list_for_remaining_safe(pos, n, head) \
66         for (n = pos->next; pos != (head); pos = n, n = pos->next)
67
68 static inline int
69 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
70 {
71         return((new->l_policy_data.l_flock.pid ==
72                 lock->l_policy_data.l_flock.pid) &&
73                (new->l_export == lock->l_export));
74 }
75
76 static inline int
77 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
78 {
79         return((new->l_policy_data.l_flock.start <=
80                 lock->l_policy_data.l_flock.end) &&
81                (new->l_policy_data.l_flock.end >=
82                 lock->l_policy_data.l_flock.start));
83 }
84
85 static inline void
86 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
87 {
88         ENTRY;
89
90         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
91                    mode, flags);
92
93         /* Safe to not lock here, since it should be empty anyway */
94         LASSERT(list_empty(&lock->l_flock_waitq));
95
96         list_del_init(&lock->l_res_link);
97         if (flags == LDLM_FL_WAIT_NOREPROC) {
98                 /* client side - set a flag to prevent sending a CANCEL */
99                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
100                 ldlm_lock_decref_internal(lock, mode);
101         }
102
103         ldlm_lock_destroy_nolock(lock);
104         EXIT;
105 }
106
107 static int
108 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
109 {
110         struct obd_export *req_export = req->l_export;
111         struct obd_export *blocking_export = blocking_lock->l_export;
112         pid_t req_pid = req->l_policy_data.l_flock.pid;
113         pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
114         struct ldlm_lock *lock;
115
116         spin_lock(&ldlm_flock_waitq_lock);
117 restart:
118         list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
119                 if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
120                     (lock->l_export != blocking_export))
121                         continue;
122
123                 blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
124                 blocking_export = (struct obd_export *)(long)
125                         lock->l_policy_data.l_flock.blocking_export;
126                 if (blocking_pid == req_pid && blocking_export == req_export) {
127                         spin_unlock(&ldlm_flock_waitq_lock);
128                         return 1;
129                 }
130
131                 goto restart;
132         }
133         spin_unlock(&ldlm_flock_waitq_lock);
134
135         return 0;
136 }
137
138 int
139 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
140                         ldlm_error_t *err, struct list_head *work_list)
141 {
142         struct ldlm_resource *res = req->l_resource;
143         struct ldlm_namespace *ns = res->lr_namespace;
144         struct list_head *tmp;
145         struct list_head *ownlocks = NULL;
146         struct ldlm_lock *lock = NULL;
147         struct ldlm_lock *new = req;
148         struct ldlm_lock *new2 = NULL;
149         ldlm_mode_t mode = req->l_req_mode;
150         int local = ns_is_client(ns);
151         int added = (mode == LCK_NL);
152         int overlaps = 0;
153         ENTRY;
154
155         CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
156                "\n", *flags, new->l_policy_data.l_flock.pid, mode,
157                req->l_policy_data.l_flock.start,
158                req->l_policy_data.l_flock.end);
159
160         *err = ELDLM_OK;
161
162         if (local) {
163                 /* No blocking ASTs are sent to the clients for
164                  * Posix file & record locks */
165                 req->l_blocking_ast = NULL;
166         } else {
167                 /* Called on the server for lock cancels. */
168                 req->l_blocking_ast = ldlm_flock_blocking_ast;
169         }
170
171         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
172                 /* This loop determines where this processes locks start
173                  * in the resource lr_granted list. */
174                 list_for_each(tmp, &res->lr_granted) {
175                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
176                         if (ldlm_same_flock_owner(lock, req)) {
177                                 ownlocks = tmp;
178                                 break;
179                         }
180                 }
181         } else {
182                 lockmode_verify(mode);
183
184                 /* This loop determines if there are existing locks
185                  * that conflict with the new lock request. */
186                 list_for_each(tmp, &res->lr_granted) {
187                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
188
189                         if (ldlm_same_flock_owner(lock, req)) {
190                                 if (!ownlocks)
191                                         ownlocks = tmp;
192                                 continue;
193                         }
194
195                         /* locks are compatible, overlap doesn't matter */
196                         if (lockmode_compat(lock->l_granted_mode, mode))
197                                 continue;
198
199                         if (!ldlm_flocks_overlap(lock, req))
200                                 continue;
201
202                         if (!first_enq)
203                                 RETURN(LDLM_ITER_CONTINUE);
204
205                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
206                                 ldlm_flock_destroy(req, mode, *flags);
207                                 *err = -EAGAIN;
208                                 RETURN(LDLM_ITER_STOP);
209                         }
210
211                         if (*flags & LDLM_FL_TEST_LOCK) {
212                                 ldlm_flock_destroy(req, mode, *flags);
213                                 req->l_req_mode = lock->l_granted_mode;
214                                 req->l_policy_data.l_flock.pid =
215                                         lock->l_policy_data.l_flock.pid;
216                                 req->l_policy_data.l_flock.start =
217                                         lock->l_policy_data.l_flock.start;
218                                 req->l_policy_data.l_flock.end =
219                                         lock->l_policy_data.l_flock.end;
220                                 *flags |= LDLM_FL_LOCK_CHANGED;
221                                 RETURN(LDLM_ITER_STOP);
222                         }
223
224                         if (ldlm_flock_deadlock(req, lock)) {
225                                 ldlm_flock_destroy(req, mode, *flags);
226                                 *err = -EDEADLK;
227                                 RETURN(LDLM_ITER_STOP);
228                         }
229
230                         req->l_policy_data.l_flock.blocking_pid =
231                                 lock->l_policy_data.l_flock.pid;
232                         req->l_policy_data.l_flock.blocking_export =
233                                 (long)(void *)lock->l_export;
234
235                         LASSERT(list_empty(&req->l_flock_waitq));
236                         spin_lock(&ldlm_flock_waitq_lock);
237                         list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
238                         spin_unlock(&ldlm_flock_waitq_lock);
239
240                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
241                         *flags |= LDLM_FL_BLOCK_GRANTED;
242                         RETURN(LDLM_ITER_STOP);
243                 }
244         }
245
246         if (*flags & LDLM_FL_TEST_LOCK) {
247                 ldlm_flock_destroy(req, mode, *flags);
248                 req->l_req_mode = LCK_NL;
249                 *flags |= LDLM_FL_LOCK_CHANGED;
250                 RETURN(LDLM_ITER_STOP);
251         }
252
253         /* In case we had slept on this lock request take it off of the
254          * deadlock detection waitq. */
255         spin_lock(&ldlm_flock_waitq_lock);
256         list_del_init(&req->l_flock_waitq);
257         spin_unlock(&ldlm_flock_waitq_lock);
258
259         /* Scan the locks owned by this process that overlap this request.
260          * We may have to merge or split existing locks. */
261
262         if (!ownlocks)
263                 ownlocks = &res->lr_granted;
264
265         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
266                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
267
268                 if (!ldlm_same_flock_owner(lock, new))
269                         break;
270
271                 if (lock->l_granted_mode == mode) {
272                         /* If the modes are the same then we need to process
273                          * locks that overlap OR adjoin the new lock. The extra
274                          * logic condition is necessary to deal with arithmetic
275                          * overflow and underflow. */
276                         if ((new->l_policy_data.l_flock.start >
277                              (lock->l_policy_data.l_flock.end + 1))
278                             && (lock->l_policy_data.l_flock.end !=
279                                 OBD_OBJECT_EOF))
280                                 continue;
281
282                         if ((new->l_policy_data.l_flock.end <
283                              (lock->l_policy_data.l_flock.start - 1))
284                             && (lock->l_policy_data.l_flock.start != 0))
285                                 break;
286
287                         if (new->l_policy_data.l_flock.start <
288                             lock->l_policy_data.l_flock.start) {
289                                 lock->l_policy_data.l_flock.start =
290                                         new->l_policy_data.l_flock.start;
291                         } else {
292                                 new->l_policy_data.l_flock.start =
293                                         lock->l_policy_data.l_flock.start;
294                         }
295
296                         if (new->l_policy_data.l_flock.end >
297                             lock->l_policy_data.l_flock.end) {
298                                 lock->l_policy_data.l_flock.end =
299                                         new->l_policy_data.l_flock.end;
300                         } else {
301                                 new->l_policy_data.l_flock.end =
302                                         lock->l_policy_data.l_flock.end;
303                         }
304
305                         if (added) {
306                                 ldlm_flock_destroy(lock, mode, *flags);
307                         } else {
308                                 new = lock;
309                                 added = 1;
310                         }
311                         continue;
312                 }
313
314                 if (new->l_policy_data.l_flock.start >
315                     lock->l_policy_data.l_flock.end)
316                         continue;
317
318                 if (new->l_policy_data.l_flock.end <
319                     lock->l_policy_data.l_flock.start)
320                         break;
321
322                 ++overlaps;
323
324                 if (new->l_policy_data.l_flock.start <=
325                     lock->l_policy_data.l_flock.start) {
326                         if (new->l_policy_data.l_flock.end <
327                             lock->l_policy_data.l_flock.end) {
328                                 lock->l_policy_data.l_flock.start =
329                                         new->l_policy_data.l_flock.end + 1;
330                                 break;
331                         }
332                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
333                         continue;
334                 }
335                 if (new->l_policy_data.l_flock.end >=
336                     lock->l_policy_data.l_flock.end) {
337                         lock->l_policy_data.l_flock.end =
338                                 new->l_policy_data.l_flock.start - 1;
339                         continue;
340                 }
341
342                 /* split the existing lock into two locks */
343
344                 /* if this is an F_UNLCK operation then we could avoid
345                  * allocating a new lock and use the req lock passed in
346                  * with the request but this would complicate the reply
347                  * processing since updates to req get reflected in the
348                  * reply. The client side replays the lock request so
349                  * it must see the original lock data in the reply. */
350
351                 /* XXX - if ldlm_lock_new() can sleep we should
352                  * release the ns_lock, allocate the new lock,
353                  * and restart processing this lock. */
354                 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
355                                         lock->l_granted_mode, NULL, NULL, NULL,
356                                         NULL, 0);
357                 if (!new2) {
358                         ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
359                         *err = -ENOLCK;
360                         RETURN(LDLM_ITER_STOP);
361                 }
362
363                 new2->l_granted_mode = lock->l_granted_mode;
364                 new2->l_policy_data.l_flock.pid =
365                         new->l_policy_data.l_flock.pid;
366                 new2->l_policy_data.l_flock.start =
367                         lock->l_policy_data.l_flock.start;
368                 new2->l_policy_data.l_flock.end =
369                         new->l_policy_data.l_flock.start - 1;
370                 lock->l_policy_data.l_flock.start =
371                         new->l_policy_data.l_flock.end + 1;
372                 new2->l_conn_export = lock->l_conn_export;
373                 if (lock->l_export != NULL) {
374                         new2->l_export = class_export_get(lock->l_export);
375                         spin_lock(&new2->l_export->exp_ldlm_data.led_lock);
376                         list_add(&new2->l_export_chain,
377                                  &new2->l_export->exp_ldlm_data.led_held_locks);
378                         spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);
379                 }
380                 if (*flags == LDLM_FL_WAIT_NOREPROC)
381                         ldlm_lock_addref_internal(new2, lock->l_granted_mode);
382
383                 /* insert new2 at lock */
384                 ldlm_resource_add_lock(res, ownlocks, new2);
385                 LDLM_LOCK_PUT(new2);
386                 break;
387         }
388
389         /* At this point we're granting the lock request. */
390         req->l_granted_mode = req->l_req_mode;
391
392         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
393         if (!added) {
394                 list_del_init(&req->l_res_link);
395                 /* insert new lock before ownlocks in list. */
396                 ldlm_resource_add_lock(res, ownlocks, req);
397         }
398
399         if (*flags != LDLM_FL_WAIT_NOREPROC) {
400                 if (first_enq) {
401                         /* If this is an unlock, reprocess the waitq and
402                          * send completions ASTs for locks that can now be
403                          * granted. The only problem with doing this
404                          * reprocessing here is that the completion ASTs for
405                          * newly granted locks will be sent before the unlock
406                          * completion is sent. It shouldn't be an issue. Also
407                          * note that ldlm_process_flock_lock() will recurse,
408                          * but only once because first_enq will be false from
409                          * ldlm_reprocess_queue. */
410                         if ((mode == LCK_NL) && overlaps) {
411                                 CFS_LIST_HEAD(rpc_list);
412                                 int rc;
413 restart:
414                                 ldlm_reprocess_queue(res, &res->lr_waiting,
415                                                      &rpc_list);
416
417                                 unlock_res(res);
418                                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
419                                 lock_res(res);
420                                 if (rc == -ERESTART)
421                                         GOTO(restart, -ERESTART);
422                        }
423                 } else {
424                         LASSERT(req->l_completion_ast);
425                         ldlm_add_ast_work_item(req, NULL, work_list);
426                 }
427         }
428
429         /* In case we're reprocessing the requested lock we can't destroy
430          * it until after calling ldlm_ast_work_item() above so that lawi()
431          * can bump the reference count on req. Otherwise req could be freed
432          * before the completion AST can be sent.  */
433         if (added)
434                 ldlm_flock_destroy(req, mode, *flags);
435
436         ldlm_resource_dump(D_OTHER, res);
437         RETURN(LDLM_ITER_CONTINUE);
438 }
439
440 struct ldlm_flock_wait_data {
441         struct ldlm_lock *fwd_lock;
442         int               fwd_generation;
443 };
444
445 static void
446 ldlm_flock_interrupted_wait(void *data)
447 {
448         struct ldlm_lock *lock;
449         struct lustre_handle lockh;
450         int rc;
451         ENTRY;
452
453         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
454
455         /* take lock off the deadlock detection waitq. */
456         spin_lock(&ldlm_flock_waitq_lock);
457         list_del_init(&lock->l_flock_waitq);
458         spin_unlock(&ldlm_flock_waitq_lock);
459
460         /* client side - set flag to prevent lock from being put on lru list */
461         lock->l_flags |= LDLM_FL_CBPENDING;
462
463         ldlm_lock_decref_internal(lock, lock->l_req_mode);
464         ldlm_lock2handle(lock, &lockh);
465         rc = ldlm_cli_cancel(&lockh);
466         if (rc != ELDLM_OK)
467                 CERROR("ldlm_cli_cancel: %d\n", rc);
468
469         EXIT;
470 }
471
472 int
473 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
474 {
475         struct ldlm_namespace *ns;
476         cfs_flock_t *getlk = lock->l_ast_data;
477         struct ldlm_flock_wait_data fwd;
478         struct obd_device *obd;
479         struct obd_import *imp = NULL;
480         ldlm_error_t err;
481         int rc = 0;
482         struct l_wait_info lwi;
483         ENTRY;
484
485         CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
486                flags, data, getlk);
487
488         /* Import invalidation. We need to actually release the lock
489          * references being held, so that it can go away. No point in
490          * holding the lock even if app still believes it has it, since
491          * server already dropped it anyway. Only for granted locks too. */
492         lock_res_and_lock(lock);
493         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) == 
494             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
495                 unlock_res_and_lock(lock);
496                 if (lock->l_req_mode == lock->l_granted_mode &&
497                     lock->l_granted_mode != LCK_NL)
498                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
499                 RETURN(0);
500         }
501         unlock_res_and_lock(lock);
502
503         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
504
505         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
506                        LDLM_FL_BLOCK_CONV)))
507                 goto  granted;
508
509         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
510                    "sleeping");
511
512         fwd.fwd_lock = lock;
513         obd = class_exp2obd(lock->l_conn_export);
514
515         /* if this is a local lock, then there is no import */
516         if (obd != NULL)
517                 imp = obd->u.cli.cl_import;
518
519         if (imp != NULL) {
520                 spin_lock(&imp->imp_lock);
521                 fwd.fwd_generation = imp->imp_generation;
522                 spin_unlock(&imp->imp_lock);
523         }
524
525         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
526
527         /* Go to sleep until the lock is granted. */
528         rc = l_wait_event(lock->l_waitq,
529                           ((lock->l_req_mode == lock->l_granted_mode) ||
530                            lock->l_destroyed), &lwi);
531
532         LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);
533         RETURN(rc);
534
535 granted:
536         /* before flock's complete ast gets here, the flock
537          * can possibly be freed by another thread
538          */
539         if (lock->l_destroyed) {
540                 LDLM_DEBUG(lock, "already destroyed by another thread");
541                 RETURN(0);
542         }
543
544         LDLM_DEBUG(lock, "client-side enqueue granted");
545         ns = lock->l_resource->lr_namespace;
546         lock_res(lock->l_resource);
547
548         /* take lock off the deadlock detection waitq. */
549         spin_lock(&ldlm_flock_waitq_lock);
550         list_del_init(&lock->l_flock_waitq);
551         spin_unlock(&ldlm_flock_waitq_lock);
552
553         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
554         list_del_init(&lock->l_res_link);
555
556         if (flags & LDLM_FL_TEST_LOCK) {
557                 /* fcntl(F_GETLK) request */
558                 /* The old mode was saved in getlk->fl_type so that if the mode
559                  * in the lock changes we can decref the approprate refcount. */
560                 ldlm_flock_destroy(lock, cfs_flock_type(getlk), LDLM_FL_WAIT_NOREPROC);
561                 switch (lock->l_granted_mode) {
562                 case LCK_PR:
563                         cfs_flock_set_type(getlk, F_RDLCK);
564                         break;
565                 case LCK_PW:
566                         cfs_flock_set_type(getlk, F_WRLCK);
567                         break;
568                 default:
569                         cfs_flock_set_type(getlk, F_UNLCK);
570                 }
571                 cfs_flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
572                 cfs_flock_set_start(getlk, (loff_t)lock->l_policy_data.l_flock.start);
573                 cfs_flock_set_end(getlk, (loff_t)lock->l_policy_data.l_flock.end);
574         } else {
575                 int noreproc = LDLM_FL_WAIT_NOREPROC;
576
577                 /* We need to reprocess the lock to do merges or splits
578                  * with existing locks owned by this process. */
579                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
580                 if (flags == 0)
581                         cfs_waitq_signal(&lock->l_waitq);
582         }
583         unlock_res(lock->l_resource);
584         RETURN(0);
585 }
586 EXPORT_SYMBOL(ldlm_flock_completion_ast);
587
588 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
589                             void *data, int flag)
590 {
591         struct ldlm_namespace *ns;
592         ENTRY;
593
594         LASSERT(lock);
595         LASSERT(flag == LDLM_CB_CANCELING);
596
597         ns = lock->l_resource->lr_namespace;
598
599         /* take lock off the deadlock detection waitq. */
600         spin_lock(&ldlm_flock_waitq_lock);
601         list_del_init(&lock->l_flock_waitq);
602         spin_unlock(&ldlm_flock_waitq_lock);
603         RETURN(0);
604 }