Whamcloud - gitweb
56e4b734b2645c54004db870679d473c69442d9b
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002-2004 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #ifndef EXPORT_SYMTAB
28 # define EXPORT_SYMTAB
29 #endif
30 #define DEBUG_SUBSYSTEM S_LDLM
31
32 #ifdef __KERNEL__
33 # include <libcfs/libcfs.h>
34 #else
35 # include <liblustre.h>
36 #endif
37
38 #include <lustre_dlm.h>
39 #include <obd_class.h>
40 #include <libcfs/list.h>
41 #include "ldlm_internal.h"
42
43 extern cfs_mem_cache_t *ldlm_resource_slab;
44 extern cfs_mem_cache_t *ldlm_lock_slab;
45 extern struct list_head ldlm_namespace_list;
46
47 extern struct semaphore ldlm_namespace_lock;
48 static struct semaphore ldlm_ref_sem;
49 static int ldlm_refcount;
50
51 /* LDLM state */
52
53 static struct ldlm_state *ldlm_state;
54
55 inline cfs_time_t round_timeout(cfs_time_t timeout)
56 {
57         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
58 }
59
60 /* timeout for initial callback (AST) reply */
61 static inline unsigned int ldlm_get_rq_timeout(unsigned int ldlm_timeout,
62                                                unsigned int obd_timeout)
63 {
64         unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
65
66         return timeout < 1 ? 1 : timeout;
67 }
68
69 #ifdef __KERNEL__
70 /* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
71 static spinlock_t waiting_locks_spinlock;   /* BH lock (timer) */
72 static struct list_head waiting_locks_list;
73 static cfs_timer_t waiting_locks_timer;
74
75 static struct expired_lock_thread {
76         cfs_waitq_t               elt_waitq;
77         int                       elt_state;
78         int                       elt_dump;
79         struct list_head          elt_expired_locks;
80 } expired_lock_thread;
81 #endif
82
83 #define ELT_STOPPED   0
84 #define ELT_READY     1
85 #define ELT_TERMINATE 2
86
87 struct ldlm_bl_pool {
88         spinlock_t              blp_lock;
89         struct list_head        blp_list;
90         cfs_waitq_t             blp_waitq;
91         atomic_t                blp_num_threads;
92         struct completion       blp_comp;
93 };
94
95 struct ldlm_bl_work_item {
96         struct list_head        blwi_entry;
97         struct ldlm_namespace   *blwi_ns;
98         struct ldlm_lock_desc   blwi_ld;
99         struct ldlm_lock        *blwi_lock;
100         struct list_head        blwi_head;
101         int                     blwi_count;
102 };
103
104 #ifdef __KERNEL__
105
106 static inline int have_expired_locks(void)
107 {
108         int need_to_run;
109
110         ENTRY;
111         spin_lock_bh(&waiting_locks_spinlock);
112         need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
113         spin_unlock_bh(&waiting_locks_spinlock);
114
115         RETURN(need_to_run);
116 }
117
118 static int expired_lock_main(void *arg)
119 {
120         struct list_head *expired = &expired_lock_thread.elt_expired_locks;
121         struct l_wait_info lwi = { 0 };
122         int do_dump;
123
124         ENTRY;
125         cfs_daemonize("ldlm_elt");
126
127         expired_lock_thread.elt_state = ELT_READY;
128         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
129
130         while (1) {
131                 l_wait_event(expired_lock_thread.elt_waitq,
132                              have_expired_locks() ||
133                              expired_lock_thread.elt_state == ELT_TERMINATE,
134                              &lwi);
135
136                 spin_lock_bh(&waiting_locks_spinlock);
137                 if (expired_lock_thread.elt_dump) {
138                         spin_unlock_bh(&waiting_locks_spinlock);
139
140                         /* from waiting_locks_callback, but not in timer */
141                         libcfs_debug_dumplog();
142                         libcfs_run_lbug_upcall(__FILE__,
143                                                 "waiting_locks_callback",
144                                                 expired_lock_thread.elt_dump);
145
146                         spin_lock_bh(&waiting_locks_spinlock);
147                         expired_lock_thread.elt_dump = 0;
148                 }
149
150                 do_dump = 0;
151
152                 while (!list_empty(expired)) {
153                         struct obd_export *export;
154                         struct ldlm_lock *lock;
155
156                         lock = list_entry(expired->next, struct ldlm_lock,
157                                           l_pending_chain);
158                         if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
159                             (void *)lock >= LP_POISON) {
160                                 spin_unlock_bh(&waiting_locks_spinlock);
161                                 CERROR("free lock on elt list %p\n", lock);
162                                 LBUG();
163                         }
164                         list_del_init(&lock->l_pending_chain);
165                         if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
166                             (void *)lock->l_export >= LP_POISON) {
167                                 CERROR("lock with free export on elt list %p\n",
168                                        lock->l_export);
169                                 lock->l_export = NULL;
170                                 LDLM_ERROR(lock, "free export");
171                                 continue;
172                         }
173                         export = class_export_get(lock->l_export);
174                         spin_unlock_bh(&waiting_locks_spinlock);
175
176                         do_dump++;
177                         class_fail_export(export);
178                         class_export_put(export);
179                         spin_lock_bh(&waiting_locks_spinlock);
180                 }
181                 spin_unlock_bh(&waiting_locks_spinlock);
182
183                 if (do_dump && obd_dump_on_eviction) {
184                         CERROR("dump the log upon eviction\n");
185                         libcfs_debug_dumplog();
186                 }
187
188                 if (expired_lock_thread.elt_state == ELT_TERMINATE)
189                         break;
190         }
191
192         expired_lock_thread.elt_state = ELT_STOPPED;
193         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
194         RETURN(0);
195 }
196
197 static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
198
199 /* This is called from within a timer interrupt and cannot schedule */
200 static void waiting_locks_callback(unsigned long unused)
201 {
202         struct ldlm_lock *lock, *last = NULL;
203
204 repeat:
205         spin_lock_bh(&waiting_locks_spinlock);
206         while (!list_empty(&waiting_locks_list)) {
207                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
208                                   l_pending_chain);
209
210                 if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) ||
211                     (lock->l_req_mode == LCK_GROUP))
212                         break;
213
214                 if (ptlrpc_check_suspend()) {
215                         /* there is a case when we talk to one mds, holding
216                          * lock from another mds. this way we easily can get
217                          * here, if second mds is being recovered. so, we
218                          * suspend timeouts. bug 6019 */
219
220                         LDLM_ERROR(lock, "recharge timeout: %s@%s nid %s ",
221                                    lock->l_export->exp_client_uuid.uuid,
222                                    lock->l_export->exp_connection->c_remote_uuid.uuid,
223                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
224
225                         list_del_init(&lock->l_pending_chain);
226                         spin_unlock_bh(&waiting_locks_spinlock);
227                         ldlm_add_waiting_lock(lock);
228                         goto repeat;
229                 }
230
231                 /* if timeout overlaps the activation time of suspended timeouts
232                  * then extend it to give a chance for client to reconnect */
233                 if (cfs_time_before(cfs_time_sub(lock->l_callback_timeout,
234                                                  cfs_time_seconds(obd_timeout)/2),
235                                     ptlrpc_suspend_wakeup_time())) {
236                         LDLM_ERROR(lock, "extend timeout due to recovery: %s@%s nid %s ",
237                                    lock->l_export->exp_client_uuid.uuid,
238                                    lock->l_export->exp_connection->c_remote_uuid.uuid,
239                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
240
241                         list_del_init(&lock->l_pending_chain);
242                         spin_unlock_bh(&waiting_locks_spinlock);
243                         ldlm_add_waiting_lock(lock);
244                         goto repeat;
245                 }
246
247                 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
248                            "%s@%s nid %s\n",
249                            lock->l_export->exp_client_uuid.uuid,
250                            lock->l_export->exp_connection->c_remote_uuid.uuid,
251                            libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
252
253                 last = lock;
254
255                 list_del(&lock->l_pending_chain);
256                 list_add(&lock->l_pending_chain,
257                          &expired_lock_thread.elt_expired_locks);
258         }
259
260         if (!list_empty(&expired_lock_thread.elt_expired_locks)) {
261                 if (obd_dump_on_timeout)
262                         expired_lock_thread.elt_dump = __LINE__;
263
264                 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
265         }
266
267         /*
268          * Make sure the timer will fire again if we have any locks
269          * left.
270          */
271         if (!list_empty(&waiting_locks_list)) {
272                 cfs_time_t timeout_rounded;
273                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
274                                   l_pending_chain);
275                 timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
276                 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
277         }
278         spin_unlock_bh(&waiting_locks_spinlock);
279 }
280
281 /*
282  * Indicate that we're waiting for a client to call us back cancelling a given
283  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
284  * timer to fire appropriately.  (We round up to the next second, to avoid
285  * floods of timer firings during periods of high lock contention and traffic).
286  *
287  * Called with the namespace lock held.
288  */
289 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
290 {
291         cfs_time_t timeout_rounded;
292
293         if (!list_empty(&lock->l_pending_chain))
294                 return 0;
295
296         lock->l_callback_timeout =cfs_time_add(cfs_time_current(),
297                                                cfs_time_seconds(obd_timeout)/2);
298
299         timeout_rounded = round_timeout(lock->l_callback_timeout);
300
301         if (cfs_time_before(timeout_rounded, cfs_timer_deadline(&waiting_locks_timer)) ||
302             !cfs_timer_is_armed(&waiting_locks_timer)) {
303                 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
304
305         }
306         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
307         return 1;
308 }
309
310 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
311 {
312         int ret;
313
314         LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
315
316         spin_lock_bh(&waiting_locks_spinlock);
317         if (lock->l_destroyed) {
318                 static cfs_time_t next;
319                 spin_unlock_bh(&waiting_locks_spinlock);
320                 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
321                 if (cfs_time_after(cfs_time_current(), next)) {
322                         next = cfs_time_shift(14400);
323                         libcfs_debug_dumpstack(NULL);
324                 }
325                 return 0;
326         }
327
328         ret = __ldlm_add_waiting_lock(lock);
329         spin_unlock_bh(&waiting_locks_spinlock);
330
331         LDLM_DEBUG(lock, "%sadding to wait list",
332                    ret == 0 ? "not re-" : "");
333         return ret;
334 }
335
336 /*
337  * Remove a lock from the pending list, likely because it had its cancellation
338  * callback arrive without incident.  This adjusts the lock-timeout timer if
339  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
340  *
341  * Called with namespace lock held.
342  */
343 int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
344 {
345         struct list_head *list_next;
346
347         if (list_empty(&lock->l_pending_chain))
348                 return 0;
349
350         list_next = lock->l_pending_chain.next;
351         if (lock->l_pending_chain.prev == &waiting_locks_list) {
352                 /* Removing the head of the list, adjust timer. */
353                 if (list_next == &waiting_locks_list) {
354                         /* No more, just cancel. */
355                         cfs_timer_disarm(&waiting_locks_timer);
356                 } else {
357                         struct ldlm_lock *next;
358                         next = list_entry(list_next, struct ldlm_lock,
359                                           l_pending_chain);
360                         cfs_timer_arm(&waiting_locks_timer,
361                                       round_timeout(next->l_callback_timeout));
362                 }
363         }
364         list_del_init(&lock->l_pending_chain);
365
366         return 1;
367 }
368
369 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
370 {
371         int ret;
372
373         if (lock->l_export == NULL) {
374                 /* We don't have a "waiting locks list" on clients. */
375                 LDLM_DEBUG(lock, "client lock: no-op");
376                 return 0;
377         }
378
379         spin_lock_bh(&waiting_locks_spinlock);
380         ret = __ldlm_del_waiting_lock(lock);
381         spin_unlock_bh(&waiting_locks_spinlock);
382
383         LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
384         return ret;
385 }
386
387 /*
388  * Prolong the lock
389  *
390  * Called with namespace lock held.
391  */
392 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
393 {
394         if (lock->l_export == NULL) {
395                 /* We don't have a "waiting locks list" on clients. */
396                 LDLM_DEBUG(lock, "client lock: no-op");
397                 return 0;
398         }
399
400         spin_lock_bh(&waiting_locks_spinlock);
401
402         if (list_empty(&lock->l_pending_chain)) {
403                 spin_unlock_bh(&waiting_locks_spinlock);
404                 LDLM_DEBUG(lock, "wasn't waiting");
405                 return 0;
406         }
407
408         __ldlm_del_waiting_lock(lock);
409         __ldlm_add_waiting_lock(lock);
410         spin_unlock_bh(&waiting_locks_spinlock);
411
412         LDLM_DEBUG(lock, "refreshed");
413         return 1;
414 }
415
416 #else /* !__KERNEL__ */
417
418 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
419 {
420         LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
421         RETURN(1);
422 }
423
424 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
425 {
426         RETURN(0);
427 }
428
429 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
430 {
431         RETURN(0);
432 }
433 #endif /* __KERNEL__ */
434
435 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
436                             const char *ast_type)
437 {
438         struct ptlrpc_connection *conn = lock->l_export->exp_connection;
439         char                     *str = libcfs_nid2str(conn->c_peer.nid);
440
441         LCONSOLE_ERROR_MSG(0x138, "A client on nid %s was evicted from "
442                            "service %s.\n", str,
443                            lock->l_export->exp_obd->obd_name);
444
445         LCONSOLE_ERROR_MSG(0x012, "Lock %s callback to %s timed out for "
446                            "resource %d\n", ast_type,
447                            obd_export_nid2str(lock->l_export), rc);
448
449         if (obd_dump_on_timeout)
450                 libcfs_debug_dumplog();
451         class_fail_export(lock->l_export);
452 }
453
454 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
455                                  struct ptlrpc_request *req, int rc,
456                                  const char *ast_type)
457 {
458         lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
459
460         if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
461                 LASSERT(lock->l_export);
462                 if (lock->l_export->exp_libclient) {
463                         LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
464                                    " timeout, just cancelling lock", ast_type,
465                                    libcfs_nid2str(peer.nid));
466                         ldlm_lock_cancel(lock);
467                         rc = -ERESTART;
468                 } else if (lock->l_flags & LDLM_FL_CANCEL) {
469                         LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
470                                    "cancel was received (AST reply lost?)",
471                                    ast_type, libcfs_nid2str(peer.nid));
472                         ldlm_lock_cancel(lock);
473                         rc = -ERESTART;
474                 } else {
475                         ldlm_del_waiting_lock(lock);
476                         ldlm_failed_ast(lock, rc, ast_type);
477                 }
478         } else if (rc) {
479                 if (rc == -EINVAL)
480                         LDLM_DEBUG(lock, "client (nid %s) returned %d"
481                                " from %s AST - normal race",
482                                libcfs_nid2str(peer.nid),
483                                req->rq_repmsg ?
484                                lustre_msg_get_status(req->rq_repmsg) : -1,
485                                ast_type);
486                 else
487                         LDLM_ERROR(lock, "client (nid %s) returned %d "
488                                    "from %s AST", libcfs_nid2str(peer.nid),
489                                    (req->rq_repmsg != NULL) ?
490                                    lustre_msg_get_status(req->rq_repmsg) : 0,
491                                    ast_type);
492                 ldlm_lock_cancel(lock);
493                 /* Server-side AST functions are called from ldlm_reprocess_all,
494                  * which needs to be told to please restart its reprocessing. */
495                 rc = -ERESTART;
496         }
497
498         return rc;
499 }
500
501 /*
502  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
503  * enqueued server lock conflicts with given one.
504  *
505  * Sends blocking ast rpc to the client owning that lock; arms timeout timer
506  * to wait for client response.
507  */
508 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
509                              struct ldlm_lock_desc *desc,
510                              void *data, int flag)
511 {
512         struct ldlm_request *body;
513         struct ptlrpc_request *req;
514         int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
515                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
516         int instant_cancel = 0, rc = 0;
517         ENTRY;
518
519         if (flag == LDLM_CB_CANCELING) {
520                 /* Don't need to do anything here. */
521                 RETURN(0);
522         }
523
524         LASSERT(lock);
525         if (lock->l_export->exp_obd->obd_recovering != 0) {
526                 LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
527                 ldlm_lock_dump(D_ERROR, lock, 0);
528         }
529
530         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
531                               LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size,
532                               NULL);
533         if (req == NULL)
534                 RETURN(-ENOMEM);
535
536         lock_res(lock->l_resource);
537         if (lock->l_granted_mode != lock->l_req_mode) {
538                 /* this blocking AST will be communicated as part of the
539                  * completion AST instead */
540                 unlock_res(lock->l_resource);
541                 ptlrpc_req_finished(req);
542                 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
543                 RETURN(0);
544         }
545
546         if (lock->l_destroyed) {
547                 /* What's the point? */
548                 unlock_res(lock->l_resource);
549                 ptlrpc_req_finished(req);
550                 RETURN(0);
551         }
552
553         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
554                 instant_cancel = 1;
555
556         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
557         body->lock_handle[0] = lock->l_remote_handle;
558         body->lock_desc = *desc;
559         body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
560
561         LDLM_DEBUG(lock, "server preparing blocking AST");
562
563         ptlrpc_req_set_repsize(req, 1, NULL);
564         if (instant_cancel) {
565                 unlock_res(lock->l_resource);
566                 ldlm_lock_cancel(lock);
567         } else {
568                 LASSERT(lock->l_granted_mode == lock->l_req_mode);
569                 ldlm_add_waiting_lock(lock);
570                 unlock_res(lock->l_resource);
571         }
572
573         req->rq_send_state = LUSTRE_IMP_FULL;
574         req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout);
575
576         if (lock->l_export && lock->l_export->exp_ldlm_stats)
577                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
578                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
579
580         if (unlikely(instant_cancel)) {
581                 rc = ptl_send_rpc(req, 1);
582         } else {
583                 rc = ptlrpc_queue_wait(req);
584                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
585         }
586         if (rc != 0) {
587                 /* If client canceled the lock but the cancel has not been
588                  * recieved yet, we need to update lvbo to have the proper
589                  * attributes cached. */
590                 if (rc == -EINVAL)
591                         ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
592                 rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
593         }
594
595         ptlrpc_req_finished(req);
596
597         /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
598         if (!rc && instant_cancel)
599                 rc = -ERESTART;
600
601         RETURN(rc);
602 }
603
604 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
605 {
606         struct ldlm_request *body;
607         struct ptlrpc_request *req;
608         struct timeval granted_time;
609         long total_enqueue_wait;
610         int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
611                         [DLM_LOCKREQ_OFF]     = sizeof(*body) };
612         int rc = 0, buffers = 2, instant_cancel = 0;
613         ENTRY;
614
615         LASSERT(lock != NULL);
616
617         do_gettimeofday(&granted_time);
618         total_enqueue_wait = cfs_timeval_sub(&granted_time,
619                                              &lock->l_enqueued_time, NULL);
620
621         if (total_enqueue_wait / 1000000 > obd_timeout)
622                 LDLM_ERROR(lock, "enqueue wait took %luus from %lu",
623                            total_enqueue_wait, lock->l_enqueued_time.tv_sec);
624
625         lock_res_and_lock(lock);
626         if (lock->l_resource->lr_lvb_len) {
627                 size[DLM_REQ_REC_OFF] = lock->l_resource->lr_lvb_len;
628                 buffers = 3;
629         }
630         unlock_res_and_lock(lock);
631
632         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
633                               LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, buffers,
634                               size, NULL);
635         if (req == NULL)
636                 RETURN(-ENOMEM);
637
638         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
639         body->lock_handle[0] = lock->l_remote_handle;
640         body->lock_flags = flags;
641         ldlm_lock2desc(lock, &body->lock_desc);
642
643         if (buffers == 3) {
644                 void *lvb;
645
646                 lvb = lustre_msg_buf(req->rq_reqmsg, DLM_REQ_REC_OFF,
647                                      lock->l_resource->lr_lvb_len);
648                 lock_res_and_lock(lock);
649                 memcpy(lvb, lock->l_resource->lr_lvb_data,
650                        lock->l_resource->lr_lvb_len);
651                 unlock_res_and_lock(lock);
652         }
653
654         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
655                    total_enqueue_wait);
656
657         ptlrpc_req_set_repsize(req, 1, NULL);
658
659         req->rq_send_state = LUSTRE_IMP_FULL;
660         req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout);
661
662         /* We only send real blocking ASTs after the lock is granted */
663         lock_res_and_lock(lock);
664         if (lock->l_flags & LDLM_FL_AST_SENT) {
665                 body->lock_flags |= LDLM_FL_AST_SENT;
666
667                 /* We might get here prior to ldlm_handle_enqueue setting
668                  * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
669                  * into waiting list, but this is safe and similar code in
670                  * ldlm_handle_enqueue will call ldlm_lock_cancel() still,
671                  * that would not only cancel the lock, but will also remove
672                  * it from waiting list */
673                 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
674                         unlock_res_and_lock(lock);
675                         ldlm_lock_cancel(lock);
676                         instant_cancel = 1;
677                         lock_res_and_lock(lock);
678                 } else {
679                         /* start the lock-timeout clock */
680                         ldlm_add_waiting_lock(lock);
681                 }
682         }
683         unlock_res_and_lock(lock);
684
685         if (lock->l_export && lock->l_export->exp_ldlm_stats)
686                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
687                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
688
689         rc = ptlrpc_queue_wait(req);
690         if (rc != 0)
691                 rc = ldlm_handle_ast_error(lock, req, rc, "completion");
692
693         ptlrpc_req_finished(req);
694
695         /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
696         if (!rc && instant_cancel)
697                 rc = -ERESTART;
698
699         RETURN(rc);
700 }
701
702 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
703 {
704         struct ldlm_resource *res = lock->l_resource;
705         struct ldlm_request *body;
706         struct ptlrpc_request *req;
707         int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
708                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
709         int rc = 0;
710         ENTRY;
711
712         LASSERT(lock != NULL);
713
714         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
715                               LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK, 2, size,
716                               NULL);
717         if (req == NULL)
718                 RETURN(-ENOMEM);
719
720         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
721         body->lock_handle[0] = lock->l_remote_handle;
722         ldlm_lock2desc(lock, &body->lock_desc);
723
724         lock_res_and_lock(lock);
725         size[REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
726         unlock_res_and_lock(lock);
727         res = lock->l_resource;
728         ptlrpc_req_set_repsize(req, 2, size);
729
730         req->rq_send_state = LUSTRE_IMP_FULL;
731         req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout);
732
733         if (lock->l_export && lock->l_export->exp_ldlm_stats)
734                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
735                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
736
737         rc = ptlrpc_queue_wait(req);
738         if (rc == -ELDLM_NO_LOCK_DATA)
739                 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
740         else if (rc != 0)
741                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
742         else
743                 rc = ldlm_res_lvbo_update(res, req->rq_repmsg,
744                                           REPLY_REC_OFF, 1);
745         ptlrpc_req_finished(req);
746         RETURN(rc);
747 }
748
749 static struct ldlm_lock *
750 find_existing_lock(struct obd_export *exp,
751                    const struct lustre_handle *remote_hdl)
752 {
753         struct list_head *iter;
754
755         spin_lock(&exp->exp_ldlm_data.led_lock);
756         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
757                 struct ldlm_lock *lock;
758                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
759                 if (lock->l_remote_handle.cookie == remote_hdl->cookie) {
760                         LDLM_LOCK_GET(lock);
761                         spin_unlock(&exp->exp_ldlm_data.led_lock);
762                         return lock;
763                 }
764         }
765         spin_unlock(&exp->exp_ldlm_data.led_lock);
766         return NULL;
767 }
768
769 #ifdef __KERNEL__
770 extern unsigned long long lu_time_stamp_get(void);
771 #else
772 #define lu_time_stamp_get() time(NULL)
773 #endif
774
775 /*
776  * Main server-side entry point into LDLM. This is called by ptlrpc service
777  * threads to carry out client lock enqueueing requests.
778  */
779 int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
780                          struct ptlrpc_request *req,
781                          const struct ldlm_request *dlm_req,
782                          const struct ldlm_callback_suite *cbs)
783 {
784         struct ldlm_reply *dlm_rep;
785         int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
786                         [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };
787         int rc = 0;
788         __u32 flags;
789         ldlm_error_t err = ELDLM_OK;
790         struct ldlm_lock *lock = NULL;
791         void *cookie = NULL;
792         ENTRY;
793
794         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
795
796         ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
797         flags = dlm_req->lock_flags;
798
799         LASSERT(req->rq_export);
800
801         if (req->rq_export->exp_ldlm_stats)
802                 lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,
803                                      LDLM_ENQUEUE - LDLM_FIRST_OPC);
804
805         if (unlikely(dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE ||
806                      dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE)) {
807                 DEBUG_REQ(D_ERROR, req, "invalid lock request type %d",
808                           dlm_req->lock_desc.l_resource.lr_type);
809                 GOTO(out, rc = -EFAULT);
810         }
811
812         if (unlikely(dlm_req->lock_desc.l_req_mode <= LCK_MINMODE ||
813                      dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE ||
814                      dlm_req->lock_desc.l_req_mode &
815                      (dlm_req->lock_desc.l_req_mode-1))) {
816                 DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d",
817                           dlm_req->lock_desc.l_req_mode);
818                 GOTO(out, rc = -EFAULT);
819         }
820
821         if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
822                 if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
823                              LDLM_PLAIN)) {
824                         DEBUG_REQ(D_ERROR, req,
825                                   "PLAIN lock request from IBITS client?");
826                         GOTO(out, rc = -EPROTO);
827                 }
828         } else if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
829                             LDLM_IBITS)) {
830                 DEBUG_REQ(D_ERROR, req,
831                           "IBITS lock request from unaware client?");
832                 GOTO(out, rc = -EPROTO);
833         }
834
835 #if 0
836         /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
837            against server's _CONNECT_SUPPORTED flags? (I don't want to use
838            ibits for mgc/mgs) */
839
840         /* INODEBITS_INTEROP: Perform conversion from plain lock to
841          * inodebits lock if client does not support them. */
842         if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
843             (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
844                 dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
845                 dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
846                         MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
847                 if (dlm_req->lock_desc.l_req_mode == LCK_PR)
848                         dlm_req->lock_desc.l_req_mode = LCK_CR;
849         }
850 #endif
851
852         if (unlikely(flags & LDLM_FL_REPLAY)) {
853                 lock = find_existing_lock(req->rq_export,
854                                           &dlm_req->lock_handle[0]);
855                 if (lock != NULL) {
856                         DEBUG_REQ(D_HA, req, "found existing lock cookie "LPX64,
857                                   lock->l_handle.h_cookie);
858                         GOTO(existing_lock, rc = 0);
859                 }
860         }
861
862         /* The lock's callback data might be set in the policy function */
863         lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
864                                 dlm_req->lock_desc.l_resource.lr_type,
865                                 dlm_req->lock_desc.l_req_mode,
866                                 cbs->lcs_blocking, cbs->lcs_completion,
867                                 cbs->lcs_glimpse, NULL, 0);
868
869         if (!lock)
870                 GOTO(out, rc = -ENOMEM);
871
872         do_gettimeofday(&lock->l_enqueued_time);
873         lock->l_remote_handle = dlm_req->lock_handle[0];
874         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
875
876         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
877         /* Don't enqueue a lock onto the export if it has already
878          * been evicted.  Cancel it now instead. (bug 3822) */
879         if (req->rq_export->exp_failed) {
880                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
881                 GOTO(out, rc = -ENOTCONN);
882         }
883         lock->l_export = class_export_get(req->rq_export);
884         spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
885         list_add(&lock->l_export_chain,
886                  &lock->l_export->exp_ldlm_data.led_held_locks);
887         spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
888
889 existing_lock:
890
891         if (flags & LDLM_FL_HAS_INTENT) {
892                 /* In this case, the reply buffer is allocated deep in
893                  * local_lock_enqueue by the policy function. */
894                 cookie = req;
895         } else {
896                 int buffers = 2;
897
898                 lock_res_and_lock(lock);
899                 if (lock->l_resource->lr_lvb_len) {
900                         size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
901                         buffers = 3;
902                 }
903                 unlock_res_and_lock(lock);
904
905                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
906                         GOTO(out, rc = -ENOMEM);
907
908                 rc = lustre_pack_reply(req, buffers, size, NULL);
909                 if (rc)
910                         GOTO(out, rc);
911         }
912
913         if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
914                 lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
915         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
916                 lock->l_req_extent = lock->l_policy_data.l_extent;
917
918         err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
919         if (err)
920                 GOTO(out, err);
921
922         dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
923                                  sizeof(*dlm_rep));
924         dlm_rep->lock_flags = flags;
925
926         ldlm_lock2desc(lock, &dlm_rep->lock_desc);
927         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
928
929         /* We never send a blocking AST until the lock is granted, but
930          * we can tell it right now */
931         lock_res_and_lock(lock);
932
933         /* Now take into account flags to be inherited from original lock
934            request both in reply to client and in our own lock flags. */
935         dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
936         lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
937
938         /* Don't move a pending lock onto the export if it has already
939          * been evicted.  Cancel it now instead. (bug 5683) */
940         if (unlikely(req->rq_export->exp_failed ||
941                      OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
942                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
943                 rc = -ENOTCONN;
944         } else if (lock->l_flags & LDLM_FL_AST_SENT) {
945                 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
946                 if (lock->l_granted_mode == lock->l_req_mode) {
947                         /*
948                          * Only cancel lock if it was granted, because it would
949                          * be destroyed immediatelly and would never be granted
950                          * in the future, causing timeouts on client.  Not
951                          * granted lock will be cancelled immediatelly after
952                          * sending completion AST.
953                          */
954                         if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
955                                 unlock_res_and_lock(lock);
956                                 ldlm_lock_cancel(lock);
957                                 lock_res_and_lock(lock);
958                         } else
959                                 ldlm_add_waiting_lock(lock);
960                 }
961         }
962         /* Make sure we never ever grant usual metadata locks to liblustre
963            clients */
964         if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||
965             dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&
966              req->rq_export->exp_libclient) {
967                 if (unlikely(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
968                              !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
969                         CERROR("Granting sync lock to libclient. "
970                                "req fl %d, rep fl %d, lock fl %d\n",
971                                dlm_req->lock_flags, dlm_rep->lock_flags,
972                                lock->l_flags);
973                         LDLM_ERROR(lock, "sync lock");
974                         if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
975                                 struct ldlm_intent *it;
976                                 it = lustre_msg_buf(req->rq_reqmsg,
977                                                     DLM_INTENT_IT_OFF,
978                                                     sizeof(*it));
979                                 if (it != NULL) {
980                                         CERROR("This is intent %s ("LPU64")\n",
981                                                ldlm_it2str(it->opc), it->opc);
982                                 }
983                         }
984                 }
985         }
986
987         unlock_res_and_lock(lock);
988
989         EXIT;
990  out:
991         req->rq_status = err;
992         if (req->rq_reply_state == NULL) {
993                 err = lustre_pack_reply(req, 1, NULL, NULL);
994                 if (rc == 0)
995                         rc = err;
996                 req->rq_status = rc;
997         }
998
999         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
1000          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
1001         if (lock) {
1002                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
1003                            "(err=%d, rc=%d)", err, rc);
1004
1005                 lock_res_and_lock(lock);
1006                 if (rc == 0) {
1007                         size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
1008                         if (size[DLM_REPLY_REC_OFF] > 0) {
1009                                 void *lvb = lustre_msg_buf(req->rq_repmsg,
1010                                                        DLM_REPLY_REC_OFF,
1011                                                        size[DLM_REPLY_REC_OFF]);
1012                                 LASSERTF(lvb != NULL, "req %p, lock %p\n",
1013                                          req, lock);
1014
1015                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
1016                                        size[DLM_REPLY_REC_OFF]);
1017                         }
1018                 } else {
1019                         ldlm_resource_unlink_lock(lock);
1020                         ldlm_lock_destroy_nolock(lock);
1021                 }
1022                 unlock_res_and_lock(lock);
1023
1024                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
1025                         ldlm_reprocess_all(lock->l_resource);
1026
1027                 LDLM_LOCK_PUT(lock);
1028         }
1029
1030         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
1031                           lock, rc);
1032
1033         return rc;
1034 }
1035
1036 int ldlm_handle_enqueue(struct ptlrpc_request *req,
1037                         ldlm_completion_callback completion_callback,
1038                         ldlm_blocking_callback blocking_callback,
1039                         ldlm_glimpse_callback glimpse_callback)
1040 {
1041         int rc;
1042         struct ldlm_request *dlm_req;
1043         struct ldlm_callback_suite cbs = {
1044                 .lcs_completion = completion_callback,
1045                 .lcs_blocking   = blocking_callback,
1046                 .lcs_glimpse    = glimpse_callback
1047         };
1048
1049
1050         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
1051                                      sizeof *dlm_req, lustre_swab_ldlm_request);
1052         if (dlm_req != NULL) {
1053                 rc = ldlm_handle_enqueue0(req->rq_export->exp_obd->obd_namespace,
1054                                           req, dlm_req, &cbs);
1055         } else {
1056                 CERROR ("Can't unpack dlm_req\n");
1057                 rc = -EFAULT;
1058         }
1059         return rc;
1060 }
1061
1062 int ldlm_handle_convert0(struct ptlrpc_request *req,
1063                          const struct ldlm_request *dlm_req)
1064 {
1065         struct ldlm_reply *dlm_rep;
1066         struct ldlm_lock *lock;
1067         int rc;
1068         int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
1069                         [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };
1070         ENTRY;
1071
1072         if (req->rq_export && req->rq_export->exp_ldlm_stats)
1073                 lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,
1074                                      LDLM_CONVERT - LDLM_FIRST_OPC);
1075
1076         rc = lustre_pack_reply(req, 2, size, NULL);
1077         if (rc) {
1078                 CERROR("out of memory\n");
1079                 RETURN(-ENOMEM);
1080         }
1081         dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
1082                                  sizeof(*dlm_rep));
1083         dlm_rep->lock_flags = dlm_req->lock_flags;
1084
1085         lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
1086         if (!lock) {
1087                 req->rq_status = EINVAL;
1088         } else {
1089                 void *res = NULL;
1090
1091                 LDLM_DEBUG(lock, "server-side convert handler START");
1092
1093                 do_gettimeofday(&lock->l_enqueued_time);
1094                 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
1095                                         &dlm_rep->lock_flags);
1096                 if (res) {
1097                         if (ldlm_del_waiting_lock(lock))
1098                                 LDLM_DEBUG(lock, "converted waiting lock");
1099                         req->rq_status = 0;
1100                 } else {
1101                         req->rq_status = EDEADLOCK;
1102                 }
1103         }
1104
1105         if (lock) {
1106                 if (!req->rq_status)
1107                         ldlm_reprocess_all(lock->l_resource);
1108                 LDLM_DEBUG(lock, "server-side convert handler END");
1109                 LDLM_LOCK_PUT(lock);
1110         } else
1111                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
1112
1113         RETURN(0);
1114 }
1115
1116 int ldlm_handle_convert(struct ptlrpc_request *req)
1117 {
1118         int rc;
1119         struct ldlm_request *dlm_req;
1120
1121         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof *dlm_req,
1122                                      lustre_swab_ldlm_request);
1123         if (dlm_req != NULL) {
1124                 rc = ldlm_handle_convert0(req, dlm_req);
1125         } else {
1126                 CERROR ("Can't unpack dlm_req\n");
1127                 rc = -EFAULT;
1128         }
1129         return rc;
1130 }
1131
1132 /* Cancel all the locks, which handles are packed into ldlm_request */
1133 int ldlm_request_cancel(struct ptlrpc_request *req,
1134                         const struct ldlm_request *dlm_req, int first)
1135 {
1136         struct ldlm_resource *res, *pres = NULL;
1137         struct ldlm_lock *lock;
1138         int i, count, done = 0;
1139         ENTRY;
1140
1141         LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, "
1142                           "starting at %d", dlm_req->lock_count, first);
1143         count = dlm_req->lock_count ? dlm_req->lock_count : 1;
1144         if (first >= count)
1145                 RETURN(0);
1146
1147         /* There is no lock on the server at the replay time,
1148          * skip lock cancelling to make replay tests to pass. */
1149         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1150                 RETURN(0);
1151
1152         for (i = first; i < count; i++) {
1153                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
1154                 if (!lock) {
1155                         LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
1156                                           "lock (cookie "LPU64")",
1157                                           dlm_req->lock_handle[i].cookie);
1158                         continue;
1159                 }
1160
1161                 res = lock->l_resource;
1162                 done++;
1163
1164                 if (res != pres) {
1165                         if (pres != NULL) {
1166                                 ldlm_reprocess_all(pres);
1167                                 ldlm_resource_putref(pres);
1168                         }
1169                         if (res != NULL) {
1170                                 ldlm_resource_getref(res);
1171                                 ldlm_res_lvbo_update(res, NULL, 0, 1);
1172                         }
1173                         pres = res;
1174                 }
1175                 ldlm_lock_cancel(lock);
1176                 LDLM_LOCK_PUT(lock);
1177         }
1178         if (pres != NULL) {
1179                 ldlm_reprocess_all(pres);
1180                 ldlm_resource_putref(pres);
1181         }
1182         LDLM_DEBUG_NOLOCK("server-side cancel handler END");
1183         RETURN(done);
1184 }
1185
1186 int ldlm_handle_cancel(struct ptlrpc_request *req)
1187 {
1188         struct ldlm_request *dlm_req;
1189         int rc;
1190         ENTRY;
1191
1192         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1193                                      lustre_swab_ldlm_request);
1194         if (dlm_req == NULL) {
1195                 CERROR("bad request buffer for cancel\n");
1196                 RETURN(-EFAULT);
1197         }
1198
1199         if (req->rq_export && req->rq_export->exp_ldlm_stats)
1200                 lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,
1201                                      LDLM_CANCEL - LDLM_FIRST_OPC);
1202
1203         rc = lustre_pack_reply(req, 1, NULL, NULL);
1204         if (rc) {
1205                 CERROR("out of memory\n");
1206                 RETURN(-ENOMEM);
1207         }
1208
1209         if (!ldlm_request_cancel(req, dlm_req, 0))
1210                 req->rq_status = ESTALE;
1211
1212         if (ptlrpc_reply(req) != 0)
1213                 LBUG();
1214
1215         RETURN(0);
1216 }
1217
1218 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
1219                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1220 {
1221         int do_ast;
1222         ENTRY;
1223
1224         LDLM_DEBUG(lock, "client blocking AST callback handler START");
1225
1226         lock_res_and_lock(lock);
1227         lock->l_flags |= LDLM_FL_CBPENDING;
1228
1229         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
1230                 lock->l_flags |= LDLM_FL_CANCEL;
1231
1232         do_ast = (!lock->l_readers && !lock->l_writers);
1233         unlock_res_and_lock(lock);
1234
1235         if (do_ast) {
1236                 LDLM_DEBUG(lock, "already unused, calling "
1237                            "callback (%p)", lock->l_blocking_ast);
1238                 if (lock->l_blocking_ast != NULL)
1239                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
1240                                              LDLM_CB_BLOCKING);
1241         } else {
1242                 LDLM_DEBUG(lock, "Lock still has references, will be"
1243                            " cancelled later");
1244         }
1245
1246         LDLM_DEBUG(lock, "client blocking callback handler END");
1247         LDLM_LOCK_PUT(lock);
1248         EXIT;
1249 }
1250
1251 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
1252                                     struct ldlm_namespace *ns,
1253                                     struct ldlm_request *dlm_req,
1254                                     struct ldlm_lock *lock)
1255 {
1256         CFS_LIST_HEAD(ast_list);
1257         ENTRY;
1258
1259         LDLM_DEBUG(lock, "client completion callback handler START");
1260
1261         lock_res_and_lock(lock);
1262
1263         /* If we receive the completion AST before the actual enqueue returned,
1264          * then we might need to switch lock modes, resources, or extents. */
1265         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
1266                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
1267                 LDLM_DEBUG(lock, "completion AST, new lock mode");
1268         }
1269
1270         if (lock->l_resource->lr_type != LDLM_PLAIN) {
1271                 lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
1272                 LDLM_DEBUG(lock, "completion AST, new policy data");
1273         }
1274
1275         ldlm_resource_unlink_lock(lock);
1276         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
1277                    &lock->l_resource->lr_name,
1278                    sizeof(lock->l_resource->lr_name)) != 0) {
1279                 unlock_res_and_lock(lock);
1280                 ldlm_lock_change_resource(ns, lock,
1281                                         &dlm_req->lock_desc.l_resource.lr_name);
1282                 LDLM_DEBUG(lock, "completion AST, new resource");
1283                 CERROR("change resource!\n");
1284                 lock_res_and_lock(lock);
1285         }
1286
1287         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
1288                 lock->l_flags |= LDLM_FL_CBPENDING;
1289                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
1290         }
1291
1292         if (lock->l_lvb_len) {
1293                 void *lvb;
1294                 lvb = lustre_swab_reqbuf(req, DLM_REQ_REC_OFF, lock->l_lvb_len,
1295                                          lock->l_lvb_swabber);
1296                 if (lvb == NULL) {
1297                         LDLM_ERROR(lock, "completion AST did not contain "
1298                                    "expected LVB!");
1299                 } else {
1300                         memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
1301                 }
1302         }
1303
1304         ldlm_grant_lock(lock, &ast_list);
1305         unlock_res_and_lock(lock);
1306
1307         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
1308
1309         ldlm_run_cp_ast_work(&ast_list);
1310
1311         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
1312                           lock);
1313         LDLM_LOCK_PUT(lock);
1314         EXIT;
1315 }
1316
1317 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
1318                                     struct ldlm_namespace *ns,
1319                                     struct ldlm_request *dlm_req,
1320                                     struct ldlm_lock *lock)
1321 {
1322         int rc = -ENOSYS;
1323         ENTRY;
1324
1325         LDLM_DEBUG(lock, "client glimpse AST callback handler");
1326
1327         if (lock->l_glimpse_ast != NULL)
1328                 rc = lock->l_glimpse_ast(lock, req);
1329
1330         if (req->rq_repmsg != NULL) {
1331                 ptlrpc_reply(req);
1332         } else {
1333                 req->rq_status = rc;
1334                 ptlrpc_error(req);
1335         }
1336
1337         lock_res_and_lock(lock);
1338         if (lock->l_granted_mode == LCK_PW &&
1339             !lock->l_readers && !lock->l_writers &&
1340             cfs_time_after(cfs_time_current(),
1341                            cfs_time_add(lock->l_last_used,
1342                                         cfs_time_seconds(10)))) {
1343                 unlock_res_and_lock(lock);
1344                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
1345                         ldlm_handle_bl_callback(ns, NULL, lock);
1346
1347                 EXIT;
1348                 return;
1349         }
1350         unlock_res_and_lock(lock);
1351         LDLM_LOCK_PUT(lock);
1352         EXIT;
1353 }
1354
1355 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1356 {
1357         req->rq_status = rc;
1358         if (req->rq_reply_state == NULL) {
1359                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1360                 if (rc)
1361                         return rc;
1362         }
1363         return ptlrpc_reply(req);
1364 }
1365
1366 #ifdef __KERNEL__
1367 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
1368                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
1369                              struct list_head *cancels, int count)
1370 {
1371         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1372         struct ldlm_bl_work_item *blwi;
1373         ENTRY;
1374
1375         if (cancels && count == 0)
1376                 RETURN(0);
1377
1378         OBD_ALLOC(blwi, sizeof(*blwi));
1379         if (blwi == NULL)
1380                 RETURN(-ENOMEM);
1381
1382         blwi->blwi_ns = ns;
1383         if (ld != NULL)
1384                 blwi->blwi_ld = *ld;
1385         if (count) {
1386                 list_add(&blwi->blwi_head, cancels);
1387                 list_del_init(cancels);
1388                 blwi->blwi_count = count;
1389         } else {
1390                 blwi->blwi_lock = lock;
1391         }
1392         spin_lock(&blp->blp_lock);
1393         list_add_tail(&blwi->blwi_entry, &blp->blp_list);
1394         cfs_waitq_signal(&blp->blp_waitq);
1395         spin_unlock(&blp->blp_lock);
1396
1397         RETURN(0);
1398 }
1399 #endif
1400
1401 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1402                            struct ldlm_lock *lock)
1403 {
1404 #ifdef __KERNEL__
1405         RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
1406 #else
1407         RETURN(-ENOSYS);
1408 #endif
1409 }
1410
1411 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1412                            struct list_head *cancels, int count)
1413 {
1414 #ifdef __KERNEL__
1415         RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
1416 #else
1417         RETURN(-ENOSYS);
1418 #endif
1419 }
1420
1421 static int ldlm_callback_handler(struct ptlrpc_request *req)
1422 {
1423         struct ldlm_namespace *ns;
1424         struct ldlm_request *dlm_req;
1425         struct ldlm_lock *lock;
1426         int rc;
1427         ENTRY;
1428
1429         /* Requests arrive in sender's byte order.  The ptlrpc service
1430          * handler has already checked and, if necessary, byte-swapped the
1431          * incoming request message body, but I am responsible for the
1432          * message buffers. */
1433
1434         if (req->rq_export == NULL) {
1435                 struct ldlm_request *dlm_req;
1436
1437                 CDEBUG(D_RPCTRACE, "operation %d from %s with bad "
1438                        "export cookie "LPX64"; this is "
1439                        "normal if this node rebooted with a lock held\n",
1440                        lustre_msg_get_opc(req->rq_reqmsg),
1441                        libcfs_id2str(req->rq_peer),
1442                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
1443
1444                 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
1445                                              sizeof(*dlm_req),
1446                                              lustre_swab_ldlm_request);
1447                 if (dlm_req != NULL)
1448                         CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
1449                                dlm_req->lock_handle[0].cookie);
1450
1451                 ldlm_callback_reply(req, -ENOTCONN);
1452                 RETURN(0);
1453         }
1454
1455         LASSERT(req->rq_export != NULL);
1456         LASSERT(req->rq_export->exp_obd != NULL);
1457
1458         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1459         case LDLM_BL_CALLBACK:
1460                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1461                 break;
1462         case LDLM_CP_CALLBACK:
1463                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
1464                 break;
1465         case LDLM_GL_CALLBACK:
1466                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
1467                 break;
1468         case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
1469                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1470                 rc = llog_origin_handle_cancel(req);
1471                 ldlm_callback_reply(req, rc);
1472                 RETURN(0);
1473         case OBD_QC_CALLBACK:
1474                 OBD_FAIL_RETURN(OBD_FAIL_OBD_QC_CALLBACK_NET, 0);
1475                 rc = target_handle_qc_callback(req);
1476                 ldlm_callback_reply(req, rc);
1477                 RETURN(0);
1478         case QUOTA_DQACQ:
1479         case QUOTA_DQREL:
1480                 /* reply in handler */
1481                 rc = target_handle_dqacq_callback(req);
1482                 RETURN(0);
1483         case LLOG_ORIGIN_HANDLE_CREATE:
1484                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1485                 rc = llog_origin_handle_create(req);
1486                 ldlm_callback_reply(req, rc);
1487                 RETURN(0);
1488         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1489                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1490                 rc = llog_origin_handle_next_block(req);
1491                 ldlm_callback_reply(req, rc);
1492                 RETURN(0);
1493         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1494                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1495                 rc = llog_origin_handle_read_header(req);
1496                 ldlm_callback_reply(req, rc);
1497                 RETURN(0);
1498         case LLOG_ORIGIN_HANDLE_CLOSE:
1499                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1500                 rc = llog_origin_handle_close(req);
1501                 ldlm_callback_reply(req, rc);
1502                 RETURN(0);
1503         default:
1504                 CERROR("unknown opcode %u\n",
1505                        lustre_msg_get_opc(req->rq_reqmsg));
1506                 ldlm_callback_reply(req, -EPROTO);
1507                 RETURN(0);
1508         }
1509
1510         ns = req->rq_export->exp_obd->obd_namespace;
1511         LASSERT(ns != NULL);
1512
1513         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1514                                      lustre_swab_ldlm_request);
1515         if (dlm_req == NULL) {
1516                 CERROR ("can't unpack dlm_req\n");
1517                 ldlm_callback_reply(req, -EPROTO);
1518                 RETURN (0);
1519         }
1520
1521         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]);
1522         if (!lock) {
1523                 CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
1524                        "disappeared\n", dlm_req->lock_handle[0].cookie);
1525                 ldlm_callback_reply(req, -EINVAL);
1526                 RETURN(0);
1527         }
1528
1529         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1530         lock_res_and_lock(lock);
1531         lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1532         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
1533                 /* If somebody cancels locks and cache is already droped,
1534                  * we can tell the server we have no lock. Otherwise, we
1535                  * should send cancel after dropping the cache. */
1536                 if ((lock->l_flags & LDLM_FL_CANCELING) &&
1537                     (lock->l_flags & LDLM_FL_BL_DONE)) {
1538                         LDLM_DEBUG(lock, "callback on lock "
1539                                    LPX64" - lock disappeared\n",
1540                                    dlm_req->lock_handle[0].cookie);
1541                         unlock_res_and_lock(lock);
1542                         LDLM_LOCK_PUT(lock);
1543                         ldlm_callback_reply(req, -EINVAL);
1544                         RETURN(0);
1545                 }
1546                 lock->l_flags |= LDLM_FL_BL_AST;
1547         }
1548         unlock_res_and_lock(lock);
1549
1550         /* We want the ost thread to get this reply so that it can respond
1551          * to ost requests (write cache writeback) that might be triggered
1552          * in the callback.
1553          *
1554          * But we'd also like to be able to indicate in the reply that we're
1555          * cancelling right now, because it's unused, or have an intent result
1556          * in the reply, so we might have to push the responsibility for sending
1557          * the reply down into the AST handlers, alas. */
1558
1559         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1560         case LDLM_BL_CALLBACK:
1561                 CDEBUG(D_INODE, "blocking ast\n");
1562                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
1563                         ldlm_callback_reply(req, 0);
1564                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
1565                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
1566                 break;
1567         case LDLM_CP_CALLBACK:
1568                 CDEBUG(D_INODE, "completion ast\n");
1569                 ldlm_callback_reply(req, 0);
1570                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
1571                 break;
1572         case LDLM_GL_CALLBACK:
1573                 CDEBUG(D_INODE, "glimpse ast\n");
1574                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
1575                 break;
1576         default:
1577                 LBUG();                         /* checked above */
1578         }
1579
1580         RETURN(0);
1581 }
1582
1583 static int ldlm_cancel_handler(struct ptlrpc_request *req)
1584 {
1585         int rc;
1586         ENTRY;
1587
1588         /* Requests arrive in sender's byte order.  The ptlrpc service
1589          * handler has already checked and, if necessary, byte-swapped the
1590          * incoming request message body, but I am responsible for the
1591          * message buffers. */
1592
1593         if (req->rq_export == NULL) {
1594                 struct ldlm_request *dlm_req;
1595
1596                 CERROR("operation %d from %s with bad export cookie "LPU64"\n",
1597                        lustre_msg_get_opc(req->rq_reqmsg),
1598                        libcfs_id2str(req->rq_peer),
1599                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
1600
1601                 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
1602                                              sizeof(*dlm_req),
1603                                              lustre_swab_ldlm_request);
1604                 if (dlm_req != NULL)
1605                         ldlm_lock_dump_handle(D_ERROR,
1606                                               &dlm_req->lock_handle[0]);
1607                 ldlm_callback_reply(req, -ENOTCONN);
1608                 RETURN(0);
1609         }
1610
1611         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1612
1613         /* XXX FIXME move this back to mds/handler.c, bug 249 */
1614         case LDLM_CANCEL:
1615                 CDEBUG(D_INODE, "cancel\n");
1616                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1617                 rc = ldlm_handle_cancel(req);
1618                 if (rc)
1619                         break;
1620                 RETURN(0);
1621         case OBD_LOG_CANCEL:
1622                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1623                 rc = llog_origin_handle_cancel(req);
1624                 ldlm_callback_reply(req, rc);
1625                 RETURN(0);
1626         default:
1627                 CERROR("invalid opcode %d\n",
1628                        lustre_msg_get_opc(req->rq_reqmsg));
1629                 ldlm_callback_reply(req, -EINVAL);
1630         }
1631
1632         RETURN(0);
1633 }
1634
1635 void ldlm_revoke_export_locks(struct obd_export *exp)
1636 {
1637         struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks;
1638         struct list_head  rpc_list;
1639         struct ldlm_lock *lock, *next;
1640         struct ldlm_lock_desc desc;
1641
1642         ENTRY;
1643         INIT_LIST_HEAD(&rpc_list);
1644
1645         spin_lock(&exp->exp_ldlm_data.led_lock);
1646         list_for_each_entry_safe(lock, next, locklist, l_export_chain) {
1647                 lock_res_and_lock(lock);
1648
1649                 if (lock->l_req_mode != lock->l_granted_mode) {
1650                         unlock_res_and_lock(lock);
1651                         continue;
1652                 }
1653
1654                 LASSERT(lock->l_resource);
1655                 if (lock->l_resource->lr_type != LDLM_IBITS &&
1656                     lock->l_resource->lr_type != LDLM_PLAIN) {
1657                         unlock_res_and_lock(lock);
1658                         continue;
1659                 }
1660
1661                 if (lock->l_flags & LDLM_FL_AST_SENT) {
1662                         unlock_res_and_lock(lock);
1663                         continue;
1664                 }
1665
1666                 LASSERT(lock->l_blocking_ast);
1667                 LASSERT(!lock->l_blocking_lock);
1668
1669                 lock->l_flags |= LDLM_FL_AST_SENT;
1670                 list_move(&lock->l_export_chain, &rpc_list);
1671
1672                 unlock_res_and_lock(lock);
1673         }
1674         spin_unlock(&exp->exp_ldlm_data.led_lock);
1675
1676         while (!list_empty(&rpc_list)) {
1677                 lock = list_entry(rpc_list.next, struct ldlm_lock,
1678                                   l_export_chain);
1679                 list_del_init(&lock->l_export_chain);
1680
1681                 /* the desc just pretend to exclusive */
1682                 ldlm_lock2desc(lock, &desc);
1683                 desc.l_req_mode = LCK_EX;
1684                 desc.l_granted_mode = 0;
1685
1686                 LDLM_LOCK_GET(lock);
1687                 lock->l_blocking_ast(lock, &desc, lock->l_ast_data,
1688                                      LDLM_CB_BLOCKING);
1689                 LDLM_LOCK_PUT(lock);
1690         }
1691         EXIT;
1692 }
1693
1694 #ifdef __KERNEL__
1695 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
1696 {
1697         struct ldlm_bl_work_item *blwi = NULL;
1698
1699         spin_lock(&blp->blp_lock);
1700         if (!list_empty(&blp->blp_list)) {
1701                 blwi = list_entry(blp->blp_list.next, struct ldlm_bl_work_item,
1702                                   blwi_entry);
1703                 list_del(&blwi->blwi_entry);
1704         }
1705         spin_unlock(&blp->blp_lock);
1706
1707         return blwi;
1708 }
1709
1710 struct ldlm_bl_thread_data {
1711         int                     bltd_num;
1712         struct ldlm_bl_pool     *bltd_blp;
1713 };
1714
1715 static int ldlm_bl_thread_main(void *arg)
1716 {
1717         struct ldlm_bl_thread_data *bltd = arg;
1718         struct ldlm_bl_pool *blp = bltd->bltd_blp;
1719         ENTRY;
1720
1721         {
1722                 char name[CFS_CURPROC_COMM_MAX];
1723                 snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
1724                          bltd->bltd_num);
1725                 cfs_daemonize(name);
1726         }
1727
1728         atomic_inc(&blp->blp_num_threads);
1729         complete(&blp->blp_comp);
1730
1731         while(1) {
1732                 struct l_wait_info lwi = { 0 };
1733                 struct ldlm_bl_work_item *blwi = NULL;
1734
1735                 l_wait_event_exclusive(blp->blp_waitq,
1736                                        (blwi = ldlm_bl_get_work(blp)) != NULL,
1737                                        &lwi);
1738
1739                 if (blwi->blwi_ns == NULL)
1740                         break;
1741
1742                 if (blwi->blwi_count) {
1743                         /* The special case when we cancel locks in lru
1744                          * asynchronously, we pass the list of locks here.
1745                          * Thus lock is marked LDLM_FL_CANCELING, and already
1746                          * canceled locally. */
1747                         ldlm_cli_cancel_list(&blwi->blwi_head,
1748                                              blwi->blwi_count, NULL, 0, 0);
1749                 } else {
1750                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
1751                                                 blwi->blwi_lock);
1752                 }
1753                 OBD_FREE(blwi, sizeof(*blwi));
1754         }
1755
1756         atomic_dec(&blp->blp_num_threads);
1757         complete(&blp->blp_comp);
1758         RETURN(0);
1759 }
1760
1761 #endif
1762
1763 static int ldlm_setup(void);
1764 static int ldlm_cleanup(int force);
1765
1766 int ldlm_get_ref(void)
1767 {
1768         int rc = 0;
1769         ENTRY;
1770         mutex_down(&ldlm_ref_sem);
1771         if (++ldlm_refcount == 1) {
1772                 rc = ldlm_setup();
1773                 if (rc)
1774                         ldlm_refcount--;
1775         }
1776         mutex_up(&ldlm_ref_sem);
1777
1778         RETURN(rc);
1779 }
1780
1781 void ldlm_put_ref(int force)
1782 {
1783         ENTRY;
1784         mutex_down(&ldlm_ref_sem);
1785         if (ldlm_refcount == 1) {
1786                 int rc = ldlm_cleanup(force);
1787                 if (rc)
1788                         CERROR("ldlm_cleanup failed: %d\n", rc);
1789                 else
1790                         ldlm_refcount--;
1791         } else {
1792                 ldlm_refcount--;
1793         }
1794         mutex_up(&ldlm_ref_sem);
1795
1796         EXIT;
1797 }
1798
1799 static int ldlm_setup(void)
1800 {
1801         struct ldlm_bl_pool *blp;
1802         int rc = 0;
1803 #ifdef __KERNEL__
1804         int i;
1805 #endif
1806         ENTRY;
1807
1808         if (ldlm_state != NULL)
1809                 RETURN(-EALREADY);
1810
1811         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1812         if (ldlm_state == NULL)
1813                 RETURN(-ENOMEM);
1814
1815 #ifdef LPROCFS
1816         rc = ldlm_proc_setup();
1817         if (rc != 0)
1818                 GOTO(out_free, rc);
1819 #endif
1820
1821         ldlm_state->ldlm_cb_service =
1822                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1823                                 LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
1824                                 LDLM_CB_REPLY_PORTAL, ldlm_timeout * 900,
1825                                 ldlm_callback_handler, "ldlm_cbd",
1826                                 ldlm_svc_proc_dir, NULL,
1827                                 LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX,
1828                                 "ldlm_cb",
1829                                 LCT_MD_THREAD|LCT_DT_THREAD);
1830
1831         if (!ldlm_state->ldlm_cb_service) {
1832                 CERROR("failed to start service\n");
1833                 GOTO(out_proc, rc = -ENOMEM);
1834         }
1835
1836         ldlm_state->ldlm_cancel_service =
1837                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1838                                 LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
1839                                 LDLM_CANCEL_REPLY_PORTAL, ldlm_timeout * 6000,
1840                                 ldlm_cancel_handler, "ldlm_canceld",
1841                                 ldlm_svc_proc_dir, NULL,
1842                                 LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX,
1843                                 "ldlm_cn",
1844                                 LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD);
1845
1846         if (!ldlm_state->ldlm_cancel_service) {
1847                 CERROR("failed to start service\n");
1848                 GOTO(out_proc, rc = -ENOMEM);
1849         }
1850
1851         OBD_ALLOC(blp, sizeof(*blp));
1852         if (blp == NULL)
1853                 GOTO(out_proc, rc = -ENOMEM);
1854         ldlm_state->ldlm_bl_pool = blp;
1855
1856         atomic_set(&blp->blp_num_threads, 0);
1857         cfs_waitq_init(&blp->blp_waitq);
1858         spin_lock_init(&blp->blp_lock);
1859
1860         CFS_INIT_LIST_HEAD(&blp->blp_list);
1861
1862 #ifdef __KERNEL__
1863         for (i = 0; i < LDLM_BL_THREADS; i++) {
1864                 struct ldlm_bl_thread_data bltd = {
1865                         .bltd_num = i,
1866                         .bltd_blp = blp,
1867                 };
1868                 init_completion(&blp->blp_comp);
1869                 rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
1870                 if (rc < 0) {
1871                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
1872                         GOTO(out_thread, rc);
1873                 }
1874                 wait_for_completion(&blp->blp_comp);
1875         }
1876
1877         rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service);
1878         if (rc)
1879                 GOTO(out_thread, rc);
1880
1881         rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service);
1882         if (rc)
1883                 GOTO(out_thread, rc);
1884
1885         CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
1886         expired_lock_thread.elt_state = ELT_STOPPED;
1887         cfs_waitq_init(&expired_lock_thread.elt_waitq);
1888
1889         CFS_INIT_LIST_HEAD(&waiting_locks_list);
1890         spin_lock_init(&waiting_locks_spinlock);
1891         cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
1892
1893         rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
1894         if (rc < 0) {
1895                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
1896                 GOTO(out_thread, rc);
1897         }
1898
1899         wait_event(expired_lock_thread.elt_waitq,
1900                    expired_lock_thread.elt_state == ELT_READY);
1901 #endif
1902
1903         RETURN(0);
1904
1905 #ifdef __KERNEL__
1906  out_thread:
1907         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1908         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1909 #endif
1910
1911  out_proc:
1912 #ifdef LPROCFS
1913         ldlm_proc_cleanup();
1914  out_free:
1915 #endif
1916         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1917         ldlm_state = NULL;
1918         return rc;
1919 }
1920
1921 static int ldlm_cleanup(int force)
1922 {
1923 #ifdef __KERNEL__
1924         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1925 #endif
1926         ENTRY;
1927
1928         if (!list_empty(&ldlm_namespace_list)) {
1929                 CERROR("ldlm still has namespaces; clean these up first.\n");
1930                 ldlm_dump_all_namespaces(D_DLMTRACE);
1931                 RETURN(-EBUSY);
1932         }
1933
1934 #ifdef __KERNEL__
1935         while (atomic_read(&blp->blp_num_threads) > 0) {
1936                 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1937
1938                 init_completion(&blp->blp_comp);
1939
1940                 spin_lock(&blp->blp_lock);
1941                 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1942                 cfs_waitq_signal(&blp->blp_waitq);
1943                 spin_unlock(&blp->blp_lock);
1944
1945                 wait_for_completion(&blp->blp_comp);
1946         }
1947         OBD_FREE(blp, sizeof(*blp));
1948
1949         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1950         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1951         ldlm_proc_cleanup();
1952
1953         expired_lock_thread.elt_state = ELT_TERMINATE;
1954         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
1955         wait_event(expired_lock_thread.elt_waitq,
1956                    expired_lock_thread.elt_state == ELT_STOPPED);
1957 #else
1958         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1959         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1960 #endif
1961
1962         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1963         ldlm_state = NULL;
1964
1965         RETURN(0);
1966 }
1967
1968 int __init ldlm_init(void)
1969 {
1970         init_mutex(&ldlm_ref_sem);
1971         init_mutex(&ldlm_namespace_lock);
1972         ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
1973                                                sizeof(struct ldlm_resource), 0,
1974                                                SLAB_HWCACHE_ALIGN);
1975         if (ldlm_resource_slab == NULL)
1976                 return -ENOMEM;
1977
1978         ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
1979                                            sizeof(struct ldlm_lock), 0,
1980                                            SLAB_HWCACHE_ALIGN);
1981         if (ldlm_lock_slab == NULL) {
1982                 cfs_mem_cache_destroy(ldlm_resource_slab);
1983                 return -ENOMEM;
1984         }
1985
1986         return 0;
1987 }
1988
1989 void __exit ldlm_exit(void)
1990 {
1991         int rc;
1992         if (ldlm_refcount)
1993                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1994         rc = cfs_mem_cache_destroy(ldlm_resource_slab);
1995         LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
1996         rc = cfs_mem_cache_destroy(ldlm_lock_slab);
1997         LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
1998 }
1999
2000 /* ldlm_extent.c */
2001 EXPORT_SYMBOL(ldlm_extent_shift_kms);
2002
2003 /* ldlm_lock.c */
2004 EXPORT_SYMBOL(ldlm_get_processing_policy);
2005 EXPORT_SYMBOL(ldlm_lock2desc);
2006 EXPORT_SYMBOL(ldlm_register_intent);
2007 EXPORT_SYMBOL(ldlm_lockname);
2008 EXPORT_SYMBOL(ldlm_typename);
2009 EXPORT_SYMBOL(ldlm_lock2handle);
2010 EXPORT_SYMBOL(__ldlm_handle2lock);
2011 EXPORT_SYMBOL(ldlm_lock_get);
2012 EXPORT_SYMBOL(ldlm_lock_put);
2013 EXPORT_SYMBOL(ldlm_lock_match);
2014 EXPORT_SYMBOL(ldlm_lock_cancel);
2015 EXPORT_SYMBOL(ldlm_lock_addref);
2016 EXPORT_SYMBOL(ldlm_lock_decref);
2017 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
2018 EXPORT_SYMBOL(ldlm_lock_change_resource);
2019 EXPORT_SYMBOL(ldlm_lock_set_data);
2020 EXPORT_SYMBOL(ldlm_it2str);
2021 EXPORT_SYMBOL(ldlm_lock_dump);
2022 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2023 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
2024 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2025 EXPORT_SYMBOL(ldlm_lock_allow_match);
2026
2027 /* ldlm_request.c */
2028 EXPORT_SYMBOL(ldlm_completion_ast);
2029 EXPORT_SYMBOL(ldlm_blocking_ast);
2030 EXPORT_SYMBOL(ldlm_glimpse_ast);
2031 EXPORT_SYMBOL(ldlm_expired_completion_wait);
2032 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
2033 EXPORT_SYMBOL(ldlm_cli_convert);
2034 EXPORT_SYMBOL(ldlm_cli_enqueue);
2035 EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
2036 EXPORT_SYMBOL(ldlm_cli_enqueue_local);
2037 EXPORT_SYMBOL(ldlm_cli_cancel);
2038 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
2039 EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
2040 EXPORT_SYMBOL(ldlm_cli_cancel_req);
2041 EXPORT_SYMBOL(ldlm_cli_join_lru);
2042 EXPORT_SYMBOL(ldlm_replay_locks);
2043 EXPORT_SYMBOL(ldlm_resource_foreach);
2044 EXPORT_SYMBOL(ldlm_namespace_foreach);
2045 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
2046 EXPORT_SYMBOL(ldlm_resource_iterate);
2047 EXPORT_SYMBOL(ldlm_cancel_resource_local);
2048 EXPORT_SYMBOL(ldlm_cli_cancel_list);
2049
2050 /* ldlm_lockd.c */
2051 EXPORT_SYMBOL(ldlm_server_blocking_ast);
2052 EXPORT_SYMBOL(ldlm_server_completion_ast);
2053 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
2054 EXPORT_SYMBOL(ldlm_handle_enqueue);
2055 EXPORT_SYMBOL(ldlm_handle_enqueue0);
2056 EXPORT_SYMBOL(ldlm_handle_cancel);
2057 EXPORT_SYMBOL(ldlm_request_cancel);
2058 EXPORT_SYMBOL(ldlm_handle_convert);
2059 EXPORT_SYMBOL(ldlm_handle_convert0);
2060 EXPORT_SYMBOL(ldlm_del_waiting_lock);
2061 EXPORT_SYMBOL(ldlm_get_ref);
2062 EXPORT_SYMBOL(ldlm_put_ref);
2063 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
2064 EXPORT_SYMBOL(ldlm_revoke_export_locks);
2065
2066 /* ldlm_resource.c */
2067 EXPORT_SYMBOL(ldlm_namespace_new);
2068 EXPORT_SYMBOL(ldlm_namespace_cleanup);
2069 EXPORT_SYMBOL(ldlm_namespace_free);
2070 EXPORT_SYMBOL(ldlm_namespace_dump);
2071 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
2072 EXPORT_SYMBOL(ldlm_resource_get);
2073 EXPORT_SYMBOL(ldlm_resource_putref);
2074 EXPORT_SYMBOL(ldlm_resource_unlink_lock);
2075
2076 /* ldlm_lib.c */
2077 EXPORT_SYMBOL(client_import_add_conn);
2078 EXPORT_SYMBOL(client_import_del_conn);
2079 EXPORT_SYMBOL(client_obd_setup);
2080 EXPORT_SYMBOL(client_obd_cleanup);
2081 EXPORT_SYMBOL(client_connect_import);
2082 EXPORT_SYMBOL(client_disconnect_export);
2083 EXPORT_SYMBOL(target_start_recovery_thread);
2084 EXPORT_SYMBOL(target_stop_recovery_thread);
2085 EXPORT_SYMBOL(target_handle_connect);
2086 EXPORT_SYMBOL(target_cleanup_recovery);
2087 EXPORT_SYMBOL(target_destroy_export);
2088 EXPORT_SYMBOL(target_cancel_recovery_timer);
2089 EXPORT_SYMBOL(target_send_reply);
2090 EXPORT_SYMBOL(target_queue_recovery_request);
2091 EXPORT_SYMBOL(target_handle_ping);
2092 EXPORT_SYMBOL(target_handle_disconnect);
2093
2094 /* l_lock.c */
2095 EXPORT_SYMBOL(lock_res_and_lock);
2096 EXPORT_SYMBOL(unlock_res_and_lock);
2097