Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 /*
25  * 2003 - 2005 Copyright, Hewlett-Packard Development Compnay, LP.
26  *
27  * Developed under the sponsorship of the U.S. Government
28  *     under Subcontract No. B514193
29  */
30
31 #define DEBUG_SUBSYSTEM S_LDLM
32
33 #ifdef __KERNEL__
34 #include <linux/lustre_dlm.h>
35 #include <linux/obd_support.h>
36 #include <linux/obd_class.h>
37 #include <linux/lustre_lib.h>
38 #include <libcfs/list.h>
39 #else
40 #include <liblustre.h>
41 #endif
42
43 #include "ldlm_internal.h"
44
45 static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq);
46 static int ldlm_deadlock_timeout = 30 * HZ;
47
48 /**
49  * list_for_remaining_safe - iterate over the remaining entries in a list
50  *              and safeguard against removal of a list entry.
51  * @pos:        the &struct list_head to use as a loop counter. pos MUST
52  *              have been initialized prior to using it in this macro.
53  * @n:          another &struct list_head to use as temporary storage
54  * @head:       the head for your list.
55  */
56 #define list_for_remaining_safe(pos, n, head) \
57         for (n = pos->next; pos != (head); pos = n, n = pos->next)
58
59 static inline int
60 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
61 {
62         return((new->l_policy_data.l_flock.pid ==
63                 lock->l_policy_data.l_flock.pid) &&
64                (new->l_policy_data.l_flock.nid ==
65                 lock->l_policy_data.l_flock.nid));
66 }
67
68 static inline int
69 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
70 {
71         return((new->l_policy_data.l_flock.start <=
72                 lock->l_policy_data.l_flock.end) &&
73                (new->l_policy_data.l_flock.end >=
74                 lock->l_policy_data.l_flock.start));
75 }
76
77 static inline void
78 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
79 {
80         ENTRY;
81
82         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
83                    mode, flags);
84
85         /* don't need to take the locks here because the lock
86          * is on a local destroy list, not the resource list. */
87         list_del_init(&lock->l_res_link);
88
89         if (flags == LDLM_FL_WAIT_NOREPROC) {
90                 /* client side - set flags to prevent sending a CANCEL */
91                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
92                 ldlm_lock_decref_internal(lock, mode);
93         }
94
95         ldlm_lock_destroy(lock);
96         EXIT;
97 }
98
99 int
100 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
101                         ldlm_error_t *err, struct list_head *work_list)
102 {
103         struct list_head destroy_list = LIST_HEAD_INIT(destroy_list);
104         struct ldlm_resource *res = req->l_resource;
105         struct ldlm_namespace *ns = res->lr_namespace;
106         struct list_head *pos;
107         struct list_head *tmp = NULL;
108         struct ldlm_lock *lock;
109         struct ldlm_lock *new = req;
110         struct ldlm_lock *new2;
111         ldlm_mode_t mode = req->l_req_mode;
112         int added = (mode == LCK_NL);
113         int overlaps = 0;
114         int rc = LDLM_ITER_CONTINUE;
115         int i = 0;
116         ENTRY;
117
118         CDEBUG(D_DLMTRACE, "flags %#x mode %u pid "LPU64" nid "LPU64" "
119                "start "LPU64" end "LPU64"\n", *flags, mode,
120                req->l_policy_data.l_flock.pid, 
121                req->l_policy_data.l_flock.nid, 
122                req->l_policy_data.l_flock.start,
123                req->l_policy_data.l_flock.end);
124
125         *err = ELDLM_OK;
126
127         /* No blocking ASTs are sent for Posix file & record locks */
128         req->l_blocking_ast = NULL;
129
130         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
131                 /* This loop determines where this processes locks start
132                  * in the resource lr_granted list. */
133                 list_for_each(pos, &res->lr_granted) {
134                         lock = list_entry(pos, struct ldlm_lock, l_res_link);
135                         if (ldlm_same_flock_owner(lock, req)) {
136                                 tmp = pos;
137                                 break;
138                         }
139                 }
140         } else {
141                 lockmode_verify(mode);
142
143                 /* This loop determines if there are existing locks
144                  * that conflict with the new lock request. */
145                 list_for_each(pos, &res->lr_granted) {
146                         lock = list_entry(pos, struct ldlm_lock, l_res_link);
147
148                         if (ldlm_same_flock_owner(lock, req)) {
149                                 if (!tmp)
150                                         tmp = pos;
151                                 continue;
152                         }
153
154                         /* locks are compatible, overlap doesn't matter */
155                         if (lockmode_compat(lock->l_granted_mode, mode))
156                                 continue;
157
158                         if (!ldlm_flocks_overlap(lock, req))
159                                 continue;
160
161                         /* deadlock detection will be done will be postponed
162                          * until ldlm_flock_completion_ast(). */
163
164                         *flags |= LDLM_FL_LOCK_CHANGED;
165
166                         req->l_policy_data.l_flock.blocking_pid =
167                                 lock->l_policy_data.l_flock.pid;
168                         req->l_policy_data.l_flock.blocking_nid =
169                                 lock->l_policy_data.l_flock.nid;
170
171                         if (!first_enq)
172                                 RETURN(LDLM_ITER_CONTINUE);
173
174                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
175                                 list_move(&req->l_res_link, &destroy_list);
176                                 *err = -EAGAIN;
177                                 GOTO(out, rc = LDLM_ITER_STOP);
178                         }
179
180                         if (*flags & LDLM_FL_TEST_LOCK) {
181                                 req->l_req_mode = lock->l_granted_mode;
182                                 req->l_policy_data.l_flock.pid =
183                                         lock->l_policy_data.l_flock.pid;
184                                 req->l_policy_data.l_flock.nid =
185                                         lock->l_policy_data.l_flock.nid;
186                                 req->l_policy_data.l_flock.start =
187                                         lock->l_policy_data.l_flock.start;
188                                 req->l_policy_data.l_flock.end =
189                                         lock->l_policy_data.l_flock.end;
190                                 list_move(&req->l_res_link, &destroy_list);
191                                 GOTO(out, rc = LDLM_ITER_STOP);
192                         }
193
194                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
195                         *flags |= LDLM_FL_BLOCK_GRANTED;
196                         RETURN(LDLM_ITER_STOP);
197                 }
198         }
199
200         if (*flags & LDLM_FL_TEST_LOCK) {
201                 req->l_req_mode = LCK_NL;
202                 *flags |= LDLM_FL_LOCK_CHANGED;
203                 list_move(&req->l_res_link, &destroy_list);
204                 GOTO(out, rc = LDLM_ITER_STOP);
205         }
206
207         /* Scan the locks owned by this process that overlap this request.
208          * We may have to merge or split existing locks. */
209         pos = (tmp != NULL) ? tmp : &res->lr_granted;
210
211         list_for_remaining_safe(pos, tmp, &res->lr_granted) {
212                 lock = list_entry(pos, struct ldlm_lock, l_res_link);
213
214                 if (!ldlm_same_flock_owner(lock, new))
215                         break;
216
217                 if (lock->l_granted_mode == mode) {
218                         /* If the modes are the same then we need to process
219                          * locks that overlap OR adjoin the new lock. The extra
220                          * logic condition is necessary to deal with arithmetic
221                          * overflow and underflow. */
222                         if ((new->l_policy_data.l_flock.start >
223                              (lock->l_policy_data.l_flock.end + 1))
224                             && (lock->l_policy_data.l_flock.end != ~0))
225                                 continue;
226
227                         if ((new->l_policy_data.l_flock.end <
228                              (lock->l_policy_data.l_flock.start - 1))
229                             && (lock->l_policy_data.l_flock.start != 0))
230                                 break;
231
232                         if (new->l_policy_data.l_flock.start <
233                             lock->l_policy_data.l_flock.start) {
234                                 lock->l_policy_data.l_flock.start =
235                                         new->l_policy_data.l_flock.start;
236                         } else {
237                                 new->l_policy_data.l_flock.start =
238                                         lock->l_policy_data.l_flock.start;
239                         }
240
241                         if (new->l_policy_data.l_flock.end >
242                             lock->l_policy_data.l_flock.end) {
243                                 lock->l_policy_data.l_flock.end =
244                                         new->l_policy_data.l_flock.end;
245                         } else {
246                                 new->l_policy_data.l_flock.end =
247                                         lock->l_policy_data.l_flock.end;
248                         }
249
250                         if (added) {
251                                 list_move(&lock->l_res_link, &destroy_list);
252                         } else {
253                                 new = lock;
254                                 added = 1;
255                         }
256                         continue;
257                 }
258
259                 if (new->l_policy_data.l_flock.start >
260                     lock->l_policy_data.l_flock.end)
261                         continue;
262
263                 if (new->l_policy_data.l_flock.end <
264                     lock->l_policy_data.l_flock.start)
265                         break;
266
267                 ++overlaps;
268
269                 if (new->l_policy_data.l_flock.start <=
270                     lock->l_policy_data.l_flock.start) {
271                         if (new->l_policy_data.l_flock.end <
272                             lock->l_policy_data.l_flock.end) {
273                                 lock->l_policy_data.l_flock.start =
274                                         new->l_policy_data.l_flock.end + 1;
275                                 break;
276                         }
277                         list_move(&lock->l_res_link, &destroy_list);
278                         continue;
279                 }
280                 if (new->l_policy_data.l_flock.end >=
281                     lock->l_policy_data.l_flock.end) {
282                         lock->l_policy_data.l_flock.end =
283                                 new->l_policy_data.l_flock.start - 1;
284                         continue;
285                 }
286
287                 /* split the existing lock into two locks */
288
289                 /* if this is an F_UNLCK operation then we could avoid
290                  * allocating a new lock and use the req lock passed in
291                  * with the request but this would complicate the reply
292                  * processing since updates to req get reflected in the
293                  * reply. The client side replays the lock request so
294                  * it must see the original lock data in the reply. */
295
296                 /* XXX - if ldlm_lock_new() can sleep we should
297                  * release the ns_lock, allocate the new lock,
298                  * and restart processing this lock. */
299                 new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
300                                         lock->l_granted_mode, NULL, NULL, NULL,
301                                         NULL, 0);
302                 if (!new2) {
303                         list_move(&req->l_res_link, &destroy_list);
304                         *err = -ENOLCK;
305                         GOTO(out, rc = LDLM_ITER_STOP);
306                 }
307
308                 new2->l_granted_mode = lock->l_granted_mode;
309                 new2->l_policy_data.l_flock.pid =
310                         new->l_policy_data.l_flock.pid;
311                 new2->l_policy_data.l_flock.nid =
312                         new->l_policy_data.l_flock.nid;
313                 new2->l_policy_data.l_flock.start =
314                         lock->l_policy_data.l_flock.start;
315                 new2->l_policy_data.l_flock.end =
316                         new->l_policy_data.l_flock.start - 1;
317                 lock->l_policy_data.l_flock.start =
318                         new->l_policy_data.l_flock.end + 1;
319                 new2->l_conn_export = lock->l_conn_export;
320                 if (lock->l_export != NULL) {
321                         new2->l_export = class_export_get(lock->l_export);
322                         list_add(&new2->l_export_chain,
323                                  &new2->l_export->exp_ldlm_data.led_held_locks);
324                 }
325                 if (*flags == LDLM_FL_WAIT_NOREPROC)
326                         ldlm_lock_addref_internal_nolock(new2,
327                                                          lock->l_granted_mode);
328
329                 /* insert new2 at lock */
330                 ldlm_resource_add_lock(res, pos, new2);
331                 LDLM_LOCK_PUT(new2);
332                 break;
333         }
334
335         /* At this point we're granting the lock request. */
336         req->l_granted_mode = req->l_req_mode;
337
338         if (added) {
339                 list_move(&req->l_res_link, &destroy_list);
340         } else {
341                 /* Add req to the granted queue before calling
342                  * ldlm_reprocess_all() below. */
343                 list_del_init(&req->l_res_link);
344                 /* insert new lock before pos in the list. */
345                 ldlm_resource_add_lock(res, pos, req);
346         }
347
348         if (*flags != LDLM_FL_WAIT_NOREPROC) {
349                 if (first_enq) {
350                         /* If this is an unlock, reprocess the waitq and
351                          * send completions ASTs for locks that can now be 
352                          * granted. The only problem with doing this
353                          * reprocessing here is that the completion ASTs for
354                          * newly granted locks will be sent before the unlock
355                          * completion is sent. It shouldn't be an issue. Also
356                          * note that ldlm_process_flock_lock() will recurse,
357                          * but only once because first_enq will be false from
358                          * ldlm_reprocess_queue. */
359                         if ((mode == LCK_NL) && overlaps) {
360                                 struct list_head rpc_list =
361                                                      LIST_HEAD_INIT(rpc_list);
362                                 int rc;
363  restart:
364                                 ldlm_reprocess_queue(res, &res->lr_waiting,
365                                                      &rpc_list);
366                                 unlock_res(res);
367                                 rc = ldlm_run_cp_ast_work(&rpc_list);
368                                 lock_res(res);
369                                 if (rc == -ERESTART)
370                                         GOTO(restart, -ERESTART);
371                        }
372                 } else {
373                         LASSERT(req->l_completion_ast);
374                         ldlm_add_ast_work_item(req, NULL, work_list);
375                 }
376         }
377
378  out:
379         if (!list_empty(&destroy_list)) {
380                 /* FIXME: major hack. when called from ldlm_lock_enqueue()
381                  * the res and the lock are locked. When called from
382                  * ldlm_reprocess_queue() the res is locked but the lock
383                  * is not. */
384                 if (added && first_enq && res->lr_namespace->ns_client)
385                         unlock_bitlock(req);
386
387                 unlock_res(res);
388
389                 CDEBUG(D_DLMTRACE, "Destroy locks:\n");
390
391                 list_for_each_safe(pos, tmp, &destroy_list) {
392                         lock = list_entry(pos, struct ldlm_lock, l_res_link);
393                         ldlm_lock_dump(D_DLMTRACE, lock, ++i);
394                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
395                 }
396
397                 if (added && first_enq && res->lr_namespace->ns_client)
398                         lock_bitlock(req);
399
400                 lock_res(res);
401         }
402
403         RETURN(rc);
404 }
405
406 struct ldlm_sleep_flock {
407         __u64 lsf_pid;
408         __u64 lsf_nid;
409         __u64 lsf_blocking_pid;
410         __u64 lsf_blocking_nid;
411         struct list_head lsf_list;
412 };
413
414 int
415 ldlm_handle_flock_deadlock_check(struct ptlrpc_request *req)
416 {
417         struct ldlm_request *dlm_req;
418         struct ldlm_sleep_flock *lsf;
419         struct list_head *pos;
420         __u64 pid, nid, blocking_pid, blocking_nid;
421         unsigned int flags;
422         int rc = 0;
423         ENTRY;
424
425         req->rq_status = 0;
426
427         dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
428                                       lustre_swab_ldlm_request);
429         if (dlm_req == NULL) {
430                 CERROR("bad request buffer for flock deadlock check\n");
431                 RETURN(-EFAULT);
432         }
433
434         flags = dlm_req->lock_flags;
435         pid = dlm_req->lock_desc.l_policy_data.l_flock.pid;
436         nid = dlm_req->lock_desc.l_policy_data.l_flock.nid;
437         blocking_pid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_pid;
438         blocking_nid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_nid;
439
440         CDEBUG(D_DLMTRACE, "flags: 0x%x req: pid: "LPU64" nid "LPU64" "
441                "blk: pid: "LPU64" nid: "LPU64"\n",
442                dlm_req->lock_flags, pid, nid, blocking_pid, blocking_nid);
443
444         if (flags & LDLM_FL_GET_BLOCKING) {
445                 struct ldlm_lock *lock;
446                 struct ldlm_reply *dlm_rep;
447                 int size = sizeof(*dlm_rep);
448                 
449                 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
450                 if (!lock) {
451                         CERROR("received deadlock check for unknown lock "
452                                "cookie "LPX64" from client %s id %s\n",
453                                dlm_req->lock_handle1.cookie,
454                                req->rq_export->exp_client_uuid.uuid,
455                                req->rq_peerstr);
456                         req->rq_status = -ESTALE;
457                         RETURN(0);
458                 }
459
460                 lock_res_and_lock(lock);
461                 blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
462                 blocking_nid = lock->l_policy_data.l_flock.blocking_nid;
463                 unlock_res_and_lock(lock);
464
465                 rc = lustre_pack_reply(req, 1, &size, NULL);
466                 if (rc) {
467                         CERROR("lustre_pack_reply failed: rc = %d\n", rc);
468                         req->rq_status = rc;
469                         RETURN(0);
470                 }
471
472                 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*dlm_rep));
473                 dlm_rep->lock_desc.l_policy_data.l_flock.blocking_pid =
474                         blocking_pid;
475                 dlm_rep->lock_desc.l_policy_data.l_flock.blocking_nid =
476                         blocking_nid;
477         } else {
478                 rc = lustre_pack_reply(req, 0, NULL, NULL);
479         }
480
481         if (flags & LDLM_FL_DEADLOCK_CHK) {
482                 __u64 orig_blocking_pid = blocking_pid;
483                 __u64 orig_blocking_nid = blocking_nid;
484  restart:
485                 list_for_each(pos, &ldlm_flock_waitq) {
486                         lsf = list_entry(pos,struct ldlm_sleep_flock,lsf_list);
487
488                         /* We want to return a deadlock condition for the
489                          * last lock on the waitq that created the deadlock
490                          * situation. Posix verification suites expect this
491                          * behavior. We'll stop if we haven't found a deadlock
492                          * up to the point where the current process is queued
493                          * to let the last lock on the queue that's in the
494                          * deadlock loop detect the deadlock. In this case
495                          * just update the blocking info.*/
496                         if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) {
497                                 lsf->lsf_blocking_pid = blocking_pid;
498                                 lsf->lsf_blocking_nid = blocking_nid;
499                                 break;
500                         }
501
502                         if ((lsf->lsf_pid != blocking_pid) ||
503                             (lsf->lsf_nid != blocking_nid))
504                                 continue;
505
506                         blocking_pid = lsf->lsf_blocking_pid;
507                         blocking_nid = lsf->lsf_blocking_nid;
508
509                         if (blocking_pid == pid && blocking_nid == nid){
510                                 req->rq_status = -EDEADLOCK;
511                                 flags |= LDLM_FL_DEADLOCK_DEL;
512                                 break;
513                         }
514
515                         goto restart;
516                 }
517
518                 /* If we got all the way thru the list then we're not on it. */
519                 if (pos == &ldlm_flock_waitq) {
520                         OBD_ALLOC(lsf, sizeof(*lsf));
521                         if (!lsf)
522                                 RETURN(-ENOSPC);
523
524                         lsf->lsf_pid = pid;
525                         lsf->lsf_nid = nid;
526                         lsf->lsf_blocking_pid = orig_blocking_pid;
527                         lsf->lsf_blocking_nid = orig_blocking_nid;
528                         list_add_tail(&lsf->lsf_list, &ldlm_flock_waitq);
529                 }
530         }
531         
532         if (flags & LDLM_FL_DEADLOCK_DEL) {
533                 list_for_each_entry(lsf, &ldlm_flock_waitq, lsf_list) {
534                         if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) {
535                                 list_del_init(&lsf->lsf_list);
536                                 OBD_FREE(lsf, sizeof(*lsf));
537                                 break;
538                         }
539                 }
540         }
541
542         RETURN(rc);
543 }
544
545 int
546 ldlm_send_flock_deadlock_check(struct obd_device *obd, struct ldlm_lock *lock,
547                                unsigned int flags)
548 {
549         struct obd_import *imp;
550         struct ldlm_request *body;
551         struct ldlm_reply *reply;
552         struct ptlrpc_request *req;
553         int rc, size = sizeof(*body);
554         ENTRY;
555
556         CDEBUG(D_DLMTRACE, "obd: %p flags: 0x%x\n", obd, flags);
557
558         imp = obd->u.cli.cl_import;
559         req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_FLK_DEADLOCK_CHK, 1,
560                               &size, NULL);
561         if (!req)
562                 RETURN(-ENOMEM);
563
564         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
565         body->lock_flags = flags;
566         ldlm_lock2desc(lock, &body->lock_desc);
567         memcpy(&body->lock_handle1, &lock->l_remote_handle,
568                sizeof(body->lock_handle1));
569
570         if (flags & LDLM_FL_GET_BLOCKING) {
571                 size = sizeof(*reply);
572                 req->rq_replen = lustre_msg_size(1, &size);
573         } else {
574                 req->rq_replen = lustre_msg_size(0, NULL);
575         }
576
577         rc = ptlrpc_queue_wait(req);
578         if (rc != ELDLM_OK)
579                 GOTO(out, rc);
580
581         if (flags & LDLM_FL_GET_BLOCKING) {
582                 reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
583                                            lustre_swab_ldlm_reply);
584                 if (reply == NULL) {
585                         CERROR ("Can't unpack ldlm_reply\n");
586                         GOTO (out, rc = -EPROTO);
587                 }
588
589                 lock->l_policy_data.l_flock.blocking_pid =
590                         reply->lock_desc.l_policy_data.l_flock.blocking_pid;
591                 lock->l_policy_data.l_flock.blocking_nid =
592                         reply->lock_desc.l_policy_data.l_flock.blocking_nid;
593
594                 CDEBUG(D_DLMTRACE, "LDLM_FL_GET_BLOCKING: pid: "LPU64" "
595                        "nid: "LPU64" blk: pid: "LPU64" nid: "LPU64"\n",
596                        lock->l_policy_data.l_flock.pid,
597                        lock->l_policy_data.l_flock.nid,
598                        lock->l_policy_data.l_flock.blocking_pid,
599                        lock->l_policy_data.l_flock.blocking_nid);
600         }
601
602         rc = req->rq_status;
603  out:
604         ptlrpc_req_finished(req);
605         RETURN(rc);
606 }
607
608 int
609 ldlm_flock_deadlock_check(struct obd_device *master_obd, struct obd_device *obd,
610                           struct ldlm_lock *lock)
611 {
612         unsigned int flags = 0;
613         int rc;
614         ENTRY;
615
616         if (obd == NULL) {
617                 /* Delete this process from the sleeplock list. */
618                 flags = LDLM_FL_DEADLOCK_DEL;
619                 rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags);
620                 RETURN(rc);
621         }
622
623         flags = LDLM_FL_GET_BLOCKING;
624         if (obd == master_obd)
625                 flags |= LDLM_FL_DEADLOCK_CHK;
626
627         rc = ldlm_send_flock_deadlock_check(obd, lock, flags);
628         CDEBUG(D_DLMTRACE, "1st check: rc: %d flags: 0x%x\n", rc, flags);
629         if (rc || (flags & LDLM_FL_DEADLOCK_CHK))
630                 RETURN(rc);
631
632         CDEBUG(D_DLMTRACE, "about to send 2nd check: master: %p.\n",
633                 master_obd);
634
635         flags = LDLM_FL_DEADLOCK_CHK;
636
637         rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags);
638
639         CDEBUG(D_DLMTRACE, "2nd check: rc: %d flags: 0x%x\n", rc, flags);
640
641         RETURN(rc);
642 }
643
644 struct ldlm_flock_wait_data {
645         struct ldlm_lock *fwd_lock;
646         int               fwd_generation;
647 };
648
649 static void
650 ldlm_flock_interrupted_wait(void *data)
651 {
652         struct ldlm_lock *lock;
653         struct lustre_handle lockh;
654         ENTRY;
655
656         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
657
658         /* client side - set flag to prevent lock from being put on lru list */
659         lock_res_and_lock(lock);
660         lock->l_flags |= LDLM_FL_CBPENDING;
661         unlock_res_and_lock(lock);
662
663         ldlm_lock_decref_internal(lock, lock->l_req_mode);
664         ldlm_lock2handle(lock, &lockh);
665         ldlm_cli_cancel(&lockh);
666         EXIT;
667 }
668
669 int
670 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
671 {
672         struct ldlm_flock_wait_data fwd;
673         unsigned long irqflags;
674         struct obd_device *obd;
675         struct obd_device *master_obd = (struct obd_device *)lock->l_ast_data;
676         struct obd_import *imp = NULL;
677         ldlm_error_t err;
678         int deadlock_checked = 0;
679         int rc = 0;
680         struct l_wait_info lwi;
681         ENTRY;
682
683         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
684
685         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
686                        LDLM_FL_BLOCK_CONV)))
687                 goto  granted;
688
689         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
690                    "sleeping");
691
692         ldlm_lock_dump(D_DLMTRACE, lock, 0);
693         fwd.fwd_lock = lock;
694         obd = class_exp2obd(lock->l_conn_export);
695
696         CDEBUG(D_DLMTRACE, "flags: 0x%x master: %p obd: %p\n",
697                flags, master_obd, obd);
698
699         /* if this is a local lock, then there is no import */
700         if (obd != NULL)
701                 imp = obd->u.cli.cl_import;
702
703         if (imp != NULL) {
704                 spin_lock_irqsave(&imp->imp_lock, irqflags);
705                 fwd.fwd_generation = imp->imp_generation;
706                 spin_unlock_irqrestore(&imp->imp_lock, irqflags);
707         }
708
709         lwi = LWI_TIMEOUT_INTR(ldlm_deadlock_timeout, NULL,
710                                ldlm_flock_interrupted_wait, &fwd);
711
712  restart:
713         rc = l_wait_event(lock->l_waitq,
714                           ((lock->l_req_mode == lock->l_granted_mode) ||
715                            lock->l_destroyed), &lwi);
716
717         if (rc == -ETIMEDOUT) {
718                 deadlock_checked = 1;
719                 rc = ldlm_flock_deadlock_check(master_obd, obd, lock);
720                 if (rc == -EDEADLK)
721                         ldlm_flock_interrupted_wait(&fwd);
722                 else {
723                         CDEBUG(D_DLMTRACE, "lock: %p going back to sleep,\n",
724                                lock);
725                         goto restart;
726                 }
727         } else {
728                 if (deadlock_checked)
729                         ldlm_flock_deadlock_check(master_obd, NULL, lock);
730         }
731
732         LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);
733         RETURN(rc);
734  
735  granted:
736         LDLM_DEBUG(lock, "client-side enqueue granted");
737         lock_res_and_lock(lock);
738
739         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
740         list_del_init(&lock->l_res_link);
741
742         if (flags & LDLM_FL_TEST_LOCK) {
743                 /* client side - set flag to prevent sending a CANCEL */
744                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
745         } else {
746                 int noreproc = LDLM_FL_WAIT_NOREPROC;
747
748                 /* We need to reprocess the lock to do merges or splits
749                  * with existing locks owned by this process. */
750                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
751                 if (flags == 0)
752                         wake_up(&lock->l_waitq);
753         }
754
755         unlock_res_and_lock(lock);
756         RETURN(0);
757 }