Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2011, Whamcloud, Inc.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 #define DEBUG_SUBSYSTEM S_LDLM
42
43 #ifdef __KERNEL__
44 #include <lustre_dlm.h>
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_lib.h>
48 #include <libcfs/list.h>
49 #else
50 #include <liblustre.h>
51 #include <obd_class.h>
52 #endif
53
54 #include "ldlm_internal.h"
55
56 #define l_flock_waitq   l_lru
57
58 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
59                             void *data, int flag);
60
61 /**
62  * list_for_remaining_safe - iterate over the remaining entries in a list
63  *              and safeguard against removal of a list entry.
64  * \param pos   the &struct list_head to use as a loop counter. pos MUST
65  *              have been initialized prior to using it in this macro.
66  * \param n     another &struct list_head to use as temporary storage
67  * \param head  the head for your list.
68  */
69 #define list_for_remaining_safe(pos, n, head) \
70         for (n = pos->next; pos != (head); pos = n, n = pos->next)
71
72 static inline int
73 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
74 {
75         return((new->l_policy_data.l_flock.owner ==
76                 lock->l_policy_data.l_flock.owner) &&
77                (new->l_export == lock->l_export));
78 }
79
80 static inline int
81 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
82 {
83         return((new->l_policy_data.l_flock.start <=
84                 lock->l_policy_data.l_flock.end) &&
85                (new->l_policy_data.l_flock.end >=
86                 lock->l_policy_data.l_flock.start));
87 }
88
89 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
90                                             struct ldlm_lock *lock)
91 {
92         /* For server only */
93         if (req->l_export == NULL)
94                 return;
95
96         LASSERT(cfs_list_empty(&req->l_flock_waitq));
97         cfs_write_lock(&req->l_export->exp_flock_wait_lock);
98
99         req->l_policy_data.l_flock.blocking_owner =
100                 lock->l_policy_data.l_flock.owner;
101         req->l_policy_data.l_flock.blocking_export =
102                 class_export_get(lock->l_export);
103
104         cfs_list_add_tail(&req->l_flock_waitq,
105                           &req->l_export->exp_flock_wait_list);
106         cfs_write_unlock(&req->l_export->exp_flock_wait_lock);
107 }
108
109 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
110 {
111         /* For server only */
112         if (req->l_export == NULL)
113                 return;
114
115         cfs_write_lock(&req->l_export->exp_flock_wait_lock);
116         if (!cfs_list_empty(&req->l_flock_waitq)) {
117                 cfs_list_del_init(&req->l_flock_waitq);
118
119                 class_export_put(req->l_policy_data.l_flock.blocking_export);
120                 req->l_policy_data.l_flock.blocking_owner = 0;
121                 req->l_policy_data.l_flock.blocking_export = NULL;
122         }
123         cfs_write_unlock(&req->l_export->exp_flock_wait_lock);
124 }
125
126 static inline void
127 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
128 {
129         ENTRY;
130
131         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
132                    mode, flags);
133
134         /* Safe to not lock here, since it should be empty anyway */
135         LASSERT(cfs_list_empty(&lock->l_flock_waitq));
136
137         cfs_list_del_init(&lock->l_res_link);
138         if (flags == LDLM_FL_WAIT_NOREPROC &&
139             !(lock->l_flags & LDLM_FL_FAILED)) {
140                 /* client side - set a flag to prevent sending a CANCEL */
141                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
142
143                 /* when reaching here, it is under lock_res_and_lock(). Thus,
144                    need call the nolock version of ldlm_lock_decref_internal*/
145                 ldlm_lock_decref_internal_nolock(lock, mode);
146         }
147
148         ldlm_lock_destroy_nolock(lock);
149         EXIT;
150 }
151
152 static int
153 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
154 {
155         struct obd_export *req_exp = req->l_export;
156         struct obd_export *bl_exp = bl_lock->l_export;
157         struct obd_export *bl_exp_new;
158         __u64 req_owner = req->l_policy_data.l_flock.owner;
159         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
160         struct ldlm_lock *lock;
161
162         /* For server only */
163         if (req_exp == NULL)
164                 return 0;
165
166         class_export_get(bl_exp);
167 restart:
168         cfs_read_lock(&bl_exp->exp_flock_wait_lock);
169         cfs_list_for_each_entry(lock, &bl_exp->exp_flock_wait_list,
170                                 l_flock_waitq) {
171                 struct ldlm_flock *flock = &lock->l_policy_data.l_flock;
172
173                 /* want to find something from same client and same process */
174                 if (flock->owner != bl_owner)
175                         continue;
176
177                 bl_owner = flock->blocking_owner;
178                 bl_exp_new = class_export_get(flock->blocking_export);
179                 cfs_read_unlock(&bl_exp->exp_flock_wait_lock);
180                 class_export_put(bl_exp);
181                 bl_exp = bl_exp_new;
182
183                 if (bl_owner == req_owner && bl_exp == req_exp) {
184                         class_export_put(bl_exp);
185                         return 1;
186                 }
187
188                 goto restart;
189         }
190         cfs_read_unlock(&bl_exp->exp_flock_wait_lock);
191         class_export_put(bl_exp);
192
193         return 0;
194 }
195
196 int
197 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
198                         ldlm_error_t *err, cfs_list_t *work_list)
199 {
200         struct ldlm_resource *res = req->l_resource;
201         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
202         cfs_list_t *tmp;
203         cfs_list_t *ownlocks = NULL;
204         struct ldlm_lock *lock = NULL;
205         struct ldlm_lock *new = req;
206         struct ldlm_lock *new2 = NULL;
207         ldlm_mode_t mode = req->l_req_mode;
208         int local = ns_is_client(ns);
209         int added = (mode == LCK_NL);
210         int overlaps = 0;
211         int splitted = 0;
212         const struct ldlm_callback_suite null_cbs = { NULL };
213         ENTRY;
214
215         CDEBUG(D_DLMTRACE, "flags %#x owner "LPU64" pid %u mode %u start "LPU64
216                " end "LPU64"\n", *flags, new->l_policy_data.l_flock.owner,
217                new->l_policy_data.l_flock.pid, mode,
218                req->l_policy_data.l_flock.start,
219                req->l_policy_data.l_flock.end);
220
221         *err = ELDLM_OK;
222
223         if (local) {
224                 /* No blocking ASTs are sent to the clients for
225                  * Posix file & record locks */
226                 req->l_blocking_ast = NULL;
227         } else {
228                 /* Called on the server for lock cancels. */
229                 req->l_blocking_ast = ldlm_flock_blocking_ast;
230         }
231
232 reprocess:
233         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
234                 /* This loop determines where this processes locks start
235                  * in the resource lr_granted list. */
236                 cfs_list_for_each(tmp, &res->lr_granted) {
237                         lock = cfs_list_entry(tmp, struct ldlm_lock,
238                                               l_res_link);
239                         if (ldlm_same_flock_owner(lock, req)) {
240                                 ownlocks = tmp;
241                                 break;
242                         }
243                 }
244         } else {
245                 lockmode_verify(mode);
246
247                 /* This loop determines if there are existing locks
248                  * that conflict with the new lock request. */
249                 cfs_list_for_each(tmp, &res->lr_granted) {
250                         lock = cfs_list_entry(tmp, struct ldlm_lock,
251                                               l_res_link);
252
253                         if (ldlm_same_flock_owner(lock, req)) {
254                                 if (!ownlocks)
255                                         ownlocks = tmp;
256                                 continue;
257                         }
258
259                         /* locks are compatible, overlap doesn't matter */
260                         if (lockmode_compat(lock->l_granted_mode, mode))
261                                 continue;
262
263                         if (!ldlm_flocks_overlap(lock, req))
264                                 continue;
265
266                         if (!first_enq)
267                                 RETURN(LDLM_ITER_CONTINUE);
268
269                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
270                                 ldlm_flock_destroy(req, mode, *flags);
271                                 *err = -EAGAIN;
272                                 RETURN(LDLM_ITER_STOP);
273                         }
274
275                         if (*flags & LDLM_FL_TEST_LOCK) {
276                                 ldlm_flock_destroy(req, mode, *flags);
277                                 req->l_req_mode = lock->l_granted_mode;
278                                 req->l_policy_data.l_flock.pid =
279                                         lock->l_policy_data.l_flock.pid;
280                                 req->l_policy_data.l_flock.start =
281                                         lock->l_policy_data.l_flock.start;
282                                 req->l_policy_data.l_flock.end =
283                                         lock->l_policy_data.l_flock.end;
284                                 *flags |= LDLM_FL_LOCK_CHANGED;
285                                 RETURN(LDLM_ITER_STOP);
286                         }
287
288                         if (ldlm_flock_deadlock(req, lock)) {
289                                 ldlm_flock_destroy(req, mode, *flags);
290                                 *err = -EDEADLK;
291                                 RETURN(LDLM_ITER_STOP);
292                         }
293
294
295                         ldlm_flock_blocking_link(req, lock);
296                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
297                         *flags |= LDLM_FL_BLOCK_GRANTED;
298                         RETURN(LDLM_ITER_STOP);
299                 }
300         }
301
302         if (*flags & LDLM_FL_TEST_LOCK) {
303                 ldlm_flock_destroy(req, mode, *flags);
304                 req->l_req_mode = LCK_NL;
305                 *flags |= LDLM_FL_LOCK_CHANGED;
306                 RETURN(LDLM_ITER_STOP);
307         }
308
309         /* In case we had slept on this lock request take it off of the
310          * deadlock detection waitq. */
311         ldlm_flock_blocking_unlink(req);
312
313         /* Scan the locks owned by this process that overlap this request.
314          * We may have to merge or split existing locks. */
315
316         if (!ownlocks)
317                 ownlocks = &res->lr_granted;
318
319         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
320                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
321
322                 if (!ldlm_same_flock_owner(lock, new))
323                         break;
324
325                 if (lock->l_granted_mode == mode) {
326                         /* If the modes are the same then we need to process
327                          * locks that overlap OR adjoin the new lock. The extra
328                          * logic condition is necessary to deal with arithmetic
329                          * overflow and underflow. */
330                         if ((new->l_policy_data.l_flock.start >
331                              (lock->l_policy_data.l_flock.end + 1))
332                             && (lock->l_policy_data.l_flock.end !=
333                                 OBD_OBJECT_EOF))
334                                 continue;
335
336                         if ((new->l_policy_data.l_flock.end <
337                              (lock->l_policy_data.l_flock.start - 1))
338                             && (lock->l_policy_data.l_flock.start != 0))
339                                 break;
340
341                         if (new->l_policy_data.l_flock.start <
342                             lock->l_policy_data.l_flock.start) {
343                                 lock->l_policy_data.l_flock.start =
344                                         new->l_policy_data.l_flock.start;
345                         } else {
346                                 new->l_policy_data.l_flock.start =
347                                         lock->l_policy_data.l_flock.start;
348                         }
349
350                         if (new->l_policy_data.l_flock.end >
351                             lock->l_policy_data.l_flock.end) {
352                                 lock->l_policy_data.l_flock.end =
353                                         new->l_policy_data.l_flock.end;
354                         } else {
355                                 new->l_policy_data.l_flock.end =
356                                         lock->l_policy_data.l_flock.end;
357                         }
358
359                         if (added) {
360                                 ldlm_flock_destroy(lock, mode, *flags);
361                         } else {
362                                 new = lock;
363                                 added = 1;
364                         }
365                         continue;
366                 }
367
368                 if (new->l_policy_data.l_flock.start >
369                     lock->l_policy_data.l_flock.end)
370                         continue;
371
372                 if (new->l_policy_data.l_flock.end <
373                     lock->l_policy_data.l_flock.start)
374                         break;
375
376                 ++overlaps;
377
378                 if (new->l_policy_data.l_flock.start <=
379                     lock->l_policy_data.l_flock.start) {
380                         if (new->l_policy_data.l_flock.end <
381                             lock->l_policy_data.l_flock.end) {
382                                 lock->l_policy_data.l_flock.start =
383                                         new->l_policy_data.l_flock.end + 1;
384                                 break;
385                         }
386                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
387                         continue;
388                 }
389                 if (new->l_policy_data.l_flock.end >=
390                     lock->l_policy_data.l_flock.end) {
391                         lock->l_policy_data.l_flock.end =
392                                 new->l_policy_data.l_flock.start - 1;
393                         continue;
394                 }
395
396                 /* split the existing lock into two locks */
397
398                 /* if this is an F_UNLCK operation then we could avoid
399                  * allocating a new lock and use the req lock passed in
400                  * with the request but this would complicate the reply
401                  * processing since updates to req get reflected in the
402                  * reply. The client side replays the lock request so
403                  * it must see the original lock data in the reply. */
404
405                 /* XXX - if ldlm_lock_new() can sleep we should
406                  * release the lr_lock, allocate the new lock,
407                  * and restart processing this lock. */
408                 if (!new2) {
409                         unlock_res_and_lock(req);
410                          new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
411                                         lock->l_granted_mode, &null_cbs,
412                                         NULL, 0);
413                         lock_res_and_lock(req);
414                         if (!new2) {
415                                 ldlm_flock_destroy(req, lock->l_granted_mode,
416                                                    *flags);
417                                 *err = -ENOLCK;
418                                 RETURN(LDLM_ITER_STOP);
419                         }
420                         goto reprocess;
421                 }
422
423                 splitted = 1;
424
425                 new2->l_granted_mode = lock->l_granted_mode;
426                 new2->l_policy_data.l_flock.pid =
427                         new->l_policy_data.l_flock.pid;
428                 new2->l_policy_data.l_flock.owner =
429                         new->l_policy_data.l_flock.owner;
430                 new2->l_policy_data.l_flock.start =
431                         lock->l_policy_data.l_flock.start;
432                 new2->l_policy_data.l_flock.end =
433                         new->l_policy_data.l_flock.start - 1;
434                 lock->l_policy_data.l_flock.start =
435                         new->l_policy_data.l_flock.end + 1;
436                 new2->l_conn_export = lock->l_conn_export;
437                 if (lock->l_export != NULL) {
438                         new2->l_export = class_export_lock_get(lock->l_export, new2);
439                         if (new2->l_export->exp_lock_hash &&
440                             cfs_hlist_unhashed(&new2->l_exp_hash))
441                                 cfs_hash_add(new2->l_export->exp_lock_hash,
442                                              &new2->l_remote_handle,
443                                              &new2->l_exp_hash);
444                 }
445                 if (*flags == LDLM_FL_WAIT_NOREPROC)
446                         ldlm_lock_addref_internal_nolock(new2,
447                                                          lock->l_granted_mode);
448
449                 /* insert new2 at lock */
450                 ldlm_resource_add_lock(res, ownlocks, new2);
451                 LDLM_LOCK_RELEASE(new2);
452                 break;
453         }
454
455         /* if new2 is created but never used, destroy it*/
456         if (splitted == 0 && new2 != NULL)
457                 ldlm_lock_destroy_nolock(new2);
458
459         /* At this point we're granting the lock request. */
460         req->l_granted_mode = req->l_req_mode;
461
462         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
463         if (!added) {
464                 cfs_list_del_init(&req->l_res_link);
465                 /* insert new lock before ownlocks in list. */
466                 ldlm_resource_add_lock(res, ownlocks, req);
467         }
468
469         if (*flags != LDLM_FL_WAIT_NOREPROC) {
470 #ifdef HAVE_SERVER_SUPPORT
471                 if (first_enq) {
472                         /* If this is an unlock, reprocess the waitq and
473                          * send completions ASTs for locks that can now be
474                          * granted. The only problem with doing this
475                          * reprocessing here is that the completion ASTs for
476                          * newly granted locks will be sent before the unlock
477                          * completion is sent. It shouldn't be an issue. Also
478                          * note that ldlm_process_flock_lock() will recurse,
479                          * but only once because first_enq will be false from
480                          * ldlm_reprocess_queue. */
481                         if ((mode == LCK_NL) && overlaps) {
482                                 CFS_LIST_HEAD(rpc_list);
483                                 int rc;
484 restart:
485                                 ldlm_reprocess_queue(res, &res->lr_waiting,
486                                                      &rpc_list);
487
488                                 unlock_res_and_lock(req);
489                                 rc = ldlm_run_ast_work(ns, &rpc_list,
490                                                        LDLM_WORK_CP_AST);
491                                 lock_res_and_lock(req);
492                                 if (rc == -ERESTART)
493                                         GOTO(restart, -ERESTART);
494                        }
495                 } else {
496                         LASSERT(req->l_completion_ast);
497                         ldlm_add_ast_work_item(req, NULL, work_list);
498                 }
499 #else /* !HAVE_SERVER_SUPPORT */
500                 /* The only one possible case for client-side calls flock
501                  * policy function is ldlm_flock_completion_ast inside which
502                  * carries LDLM_FL_WAIT_NOREPROC flag. */
503                 CERROR("Illegal parameter for client-side-only module.\n");
504                 LBUG();
505 #endif /* HAVE_SERVER_SUPPORT */
506         }
507
508         /* In case we're reprocessing the requested lock we can't destroy
509          * it until after calling ldlm_ast_work_item() above so that lawi()
510          * can bump the reference count on req. Otherwise req could be freed
511          * before the completion AST can be sent.  */
512         if (added)
513                 ldlm_flock_destroy(req, mode, *flags);
514
515         ldlm_resource_dump(D_INFO, res);
516         RETURN(LDLM_ITER_CONTINUE);
517 }
518
519 struct ldlm_flock_wait_data {
520         struct ldlm_lock *fwd_lock;
521         int               fwd_generation;
522 };
523
524 static void
525 ldlm_flock_interrupted_wait(void *data)
526 {
527         struct ldlm_lock *lock;
528         ENTRY;
529
530         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
531
532         /* take lock off the deadlock detection waitq. */
533         ldlm_flock_blocking_unlink(lock);
534
535         /* client side - set flag to prevent lock from being put on lru list */
536         lock_res_and_lock(lock);
537         lock->l_flags |= LDLM_FL_CBPENDING;
538         unlock_res_and_lock(lock);
539
540         EXIT;
541 }
542
543 /**
544  * Flock completion calback function.
545  *
546  * \param lock [in,out]: A lock to be handled
547  * \param flags    [in]: flags
548  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
549  *
550  * \retval 0    : success
551  * \retval <0   : failure
552  */
553 int
554 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
555 {
556         cfs_flock_t                    *getlk = lock->l_ast_data;
557         struct obd_device              *obd;
558         struct obd_import              *imp = NULL;
559         struct ldlm_flock_wait_data     fwd;
560         struct l_wait_info              lwi;
561         ldlm_error_t                    err;
562         int                             rc = 0;
563         ENTRY;
564
565         CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
566                flags, data, getlk);
567
568         /* Import invalidation. We need to actually release the lock
569          * references being held, so that it can go away. No point in
570          * holding the lock even if app still believes it has it, since
571          * server already dropped it anyway. Only for granted locks too. */
572         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
573             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
574                 if (lock->l_req_mode == lock->l_granted_mode &&
575                     lock->l_granted_mode != LCK_NL &&
576                     NULL == data)
577                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
578
579                 /* Need to wake up the waiter if we were evicted */
580                 cfs_waitq_signal(&lock->l_waitq);
581                 RETURN(0);
582         }
583
584         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
585
586         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
587                        LDLM_FL_BLOCK_CONV))) {
588                 if (NULL == data)
589                         /* mds granted the lock in the reply */
590                         goto granted;
591                 /* CP AST RPC: lock get granted, wake it up */
592                 cfs_waitq_signal(&lock->l_waitq);
593                 RETURN(0);
594         }
595
596         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
597                    "sleeping");
598         fwd.fwd_lock = lock;
599         obd = class_exp2obd(lock->l_conn_export);
600
601         /* if this is a local lock, there is no import */
602         if (NULL != obd)
603                 imp = obd->u.cli.cl_import;
604
605         if (NULL != imp) {
606                 cfs_spin_lock(&imp->imp_lock);
607                 fwd.fwd_generation = imp->imp_generation;
608                 cfs_spin_unlock(&imp->imp_lock);
609         }
610
611         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
612
613         /* Go to sleep until the lock is granted. */
614         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
615
616         if (rc) {
617                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
618                            rc);
619                 RETURN(rc);
620         }
621
622 granted:
623         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
624
625         if (lock->l_destroyed) {
626                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
627                 RETURN(0);
628         }
629
630         if (lock->l_flags & LDLM_FL_FAILED) {
631                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
632                 RETURN(-EIO);
633         }
634
635         if (rc) {
636                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
637                            rc);
638                 RETURN(rc);
639         }
640
641         LDLM_DEBUG(lock, "client-side enqueue granted");
642
643         /* take lock off the deadlock detection waitq. */
644         ldlm_flock_blocking_unlink(lock);
645
646         lock_res_and_lock(lock);
647         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
648         cfs_list_del_init(&lock->l_res_link);
649
650         if (flags & LDLM_FL_TEST_LOCK) {
651                 /* fcntl(F_GETLK) request */
652                 /* The old mode was saved in getlk->fl_type so that if the mode
653                  * in the lock changes we can decref the appropriate refcount.*/
654                 ldlm_flock_destroy(lock, cfs_flock_type(getlk),
655                                    LDLM_FL_WAIT_NOREPROC);
656                 switch (lock->l_granted_mode) {
657                 case LCK_PR:
658                         cfs_flock_set_type(getlk, F_RDLCK);
659                         break;
660                 case LCK_PW:
661                         cfs_flock_set_type(getlk, F_WRLCK);
662                         break;
663                 default:
664                         cfs_flock_set_type(getlk, F_UNLCK);
665                 }
666                 cfs_flock_set_pid(getlk,
667                                   (pid_t)lock->l_policy_data.l_flock.pid);
668                 cfs_flock_set_start(getlk,
669                                     (loff_t)lock->l_policy_data.l_flock.start);
670                 cfs_flock_set_end(getlk,
671                                   (loff_t)lock->l_policy_data.l_flock.end);
672         } else {
673                 int noreproc = LDLM_FL_WAIT_NOREPROC;
674
675                 /* We need to reprocess the lock to do merges or splits
676                  * with existing locks owned by this process. */
677                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
678         }
679         unlock_res_and_lock(lock);
680         RETURN(0);
681 }
682 EXPORT_SYMBOL(ldlm_flock_completion_ast);
683
684 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
685                             void *data, int flag)
686 {
687         ENTRY;
688
689         LASSERT(lock);
690         LASSERT(flag == LDLM_CB_CANCELING);
691
692         /* take lock off the deadlock detection waitq. */
693         ldlm_flock_blocking_unlink(lock);
694         RETURN(0);
695 }
696
697 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
698                                        ldlm_policy_data_t *lpolicy)
699 {
700         memset(lpolicy, 0, sizeof(*lpolicy));
701         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
702         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
703         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
704         /* Compat code, old clients had no idea about owner field and
705          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
706          * April 2011 */
707         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
708 }
709
710
711 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
712                                        ldlm_policy_data_t *lpolicy)
713 {
714         memset(lpolicy, 0, sizeof(*lpolicy));
715         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
716         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
717         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
718         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
719 }
720
721 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
722                                      ldlm_wire_policy_data_t *wpolicy)
723 {
724         memset(wpolicy, 0, sizeof(*wpolicy));
725         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
726         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
727         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
728         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
729 }