Whamcloud - gitweb
- landed b_hd_cray_merge3
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #ifndef EXPORT_SYMTAB
25 # define EXPORT_SYMTAB
26 #endif
27 #define DEBUG_SUBSYSTEM S_LDLM
28
29 #ifdef __KERNEL__
30 # include <linux/module.h>
31 # include <linux/slab.h>
32 # include <linux/init.h>
33 # include <linux/wait.h>
34 #else
35 # include <liblustre.h>
36 #endif
37
38 #include <linux/lustre_dlm.h>
39 #include <linux/obd_class.h>
40 #include <libcfs/list.h>
41 #include "ldlm_internal.h"
42
43 extern kmem_cache_t *ldlm_resource_slab;
44 extern kmem_cache_t *ldlm_lock_slab;
45 extern struct lustre_lock ldlm_handle_lock;
46 extern struct list_head ldlm_namespace_list;
47
48 static DECLARE_MUTEX(ldlm_ref_sem);
49 static int ldlm_refcount;
50 /* LDLM state */
51
52 static struct ldlm_state *ldlm_state;
53
54 inline unsigned long round_timeout(unsigned long timeout)
55 {
56         return ((timeout / HZ) + 1) * HZ;
57 }
58
59 #ifdef __KERNEL__
60 /* XXX should this be per-ldlm? */
61 static struct list_head waiting_locks_list;
62 static spinlock_t waiting_locks_spinlock;
63 static struct timer_list waiting_locks_timer;
64
65 static struct expired_lock_thread {
66         wait_queue_head_t         elt_waitq;
67         int                       elt_state;
68         struct list_head          elt_expired_locks;
69         spinlock_t                elt_lock;
70 } expired_lock_thread;
71 #endif
72
73 #if !defined(ENOTSUPP)
74 #  define ENOTSUPP 524
75 #endif
76
77 #define ELT_STOPPED   0
78 #define ELT_READY     1
79 #define ELT_TERMINATE 2
80
81 struct ldlm_bl_pool {
82         spinlock_t              blp_lock;
83         struct list_head        blp_list;
84         wait_queue_head_t       blp_waitq;
85         atomic_t                blp_num_threads;
86         struct completion       blp_comp;
87 };
88
89 struct ldlm_bl_work_item {
90         struct list_head        blwi_entry;
91         struct ldlm_namespace   *blwi_ns;
92         struct ldlm_lock_desc   blwi_ld;
93         struct ldlm_lock        *blwi_lock;
94 };
95
96 #ifdef __KERNEL__
97
98 static inline int have_expired_locks(void)
99 {
100         int need_to_run;
101
102         spin_lock_bh(&expired_lock_thread.elt_lock);
103         need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
104         spin_unlock_bh(&expired_lock_thread.elt_lock);
105
106         RETURN(need_to_run);
107 }
108
109 static int expired_lock_main(void *arg)
110 {
111         struct list_head *expired = &expired_lock_thread.elt_expired_locks;
112         struct l_wait_info lwi = { 0 };
113         unsigned long flags;
114
115         ENTRY;
116         lock_kernel();
117         kportal_daemonize("ldlm_elt");
118
119         SIGNAL_MASK_LOCK(current, flags);
120         sigfillset(&current->blocked);
121         RECALC_SIGPENDING;
122         SIGNAL_MASK_UNLOCK(current, flags);
123
124         unlock_kernel();
125
126         expired_lock_thread.elt_state = ELT_READY;
127         wake_up(&expired_lock_thread.elt_waitq);
128
129         while (1) {
130                 l_wait_event(expired_lock_thread.elt_waitq,
131                              have_expired_locks() ||
132                              expired_lock_thread.elt_state == ELT_TERMINATE,
133                              &lwi);
134
135                 spin_lock_bh(&expired_lock_thread.elt_lock);
136                 while (!list_empty(expired)) {
137                         struct obd_export *export;
138                         struct ldlm_lock *lock;
139
140                         lock = list_entry(expired->next, struct ldlm_lock,
141                                           l_pending_chain);
142                         if ((void *)lock < LP_POISON + PAGE_SIZE &&
143                             (void *)lock >= LP_POISON) {
144                                 CERROR("free lock on elt list %p\n", lock);
145                                 LBUG();
146                         }
147                         list_del_init(&lock->l_pending_chain);
148                         if ((void *)lock->l_export < LP_POISON + PAGE_SIZE &&
149                             (void *)lock->l_export >= LP_POISON + PAGE_SIZE) {
150                                 CERROR("lock with free export on elt list %p\n",
151                                        export);
152                                 lock->l_export = NULL;
153                                 LDLM_ERROR(lock, "free export\n");
154                                 continue;
155                         }
156                         export = class_export_get(lock->l_export);
157                         spin_unlock_bh(&expired_lock_thread.elt_lock);
158
159                         ptlrpc_fail_export(export);
160                         class_export_put(export);
161                         spin_lock_bh(&expired_lock_thread.elt_lock);
162                 }
163                 spin_unlock_bh(&expired_lock_thread.elt_lock);
164
165                 if (expired_lock_thread.elt_state == ELT_TERMINATE)
166                         break;
167         }
168
169         expired_lock_thread.elt_state = ELT_STOPPED;
170         wake_up(&expired_lock_thread.elt_waitq);
171         RETURN(0);
172 }
173
174 static void waiting_locks_callback(unsigned long unused)
175 {
176         struct ldlm_lock *lock;
177         char str[PTL_NALFMT_SIZE];
178
179         if (obd_dump_on_timeout)
180                 portals_debug_dumplog();
181
182         spin_lock_bh(&waiting_locks_spinlock);
183         while (!list_empty(&waiting_locks_list)) {
184                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
185                                   l_pending_chain);
186
187                 if ((lock->l_callback_timeout > jiffies) ||
188                     (lock->l_req_mode == LCK_GROUP))
189                         break;
190
191                 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
192                            "%s@%s nid %s ",
193                            lock->l_export->exp_client_uuid.uuid,
194                            lock->l_export->exp_connection->c_remote_uuid.uuid,
195                            ptlrpc_peernid2str(&lock->l_export->exp_connection->c_peer, str));
196
197                 spin_lock_bh(&expired_lock_thread.elt_lock);
198                 list_del(&lock->l_pending_chain);
199                 list_add(&lock->l_pending_chain,
200                          &expired_lock_thread.elt_expired_locks);
201                 spin_unlock_bh(&expired_lock_thread.elt_lock);
202                 wake_up(&expired_lock_thread.elt_waitq);
203         }
204
205         /*
206          * Make sure the timer will fire again if we have any locks
207          * left.
208          */
209         if (!list_empty(&waiting_locks_list)) {
210                 unsigned long timeout_rounded;
211                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
212                                   l_pending_chain);
213                 timeout_rounded = round_timeout(lock->l_callback_timeout);
214                 mod_timer(&waiting_locks_timer, timeout_rounded);
215         }
216         spin_unlock_bh(&waiting_locks_spinlock);
217 }
218
219 /*
220  * Indicate that we're waiting for a client to call us back cancelling a given
221  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
222  * timer to fire appropriately.  (We round up to the next second, to avoid
223  * floods of timer firings during periods of high lock contention and traffic).
224  */
225 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
226 {
227         unsigned long timeout_rounded;
228
229         spin_lock_bh(&waiting_locks_spinlock);
230         if (!list_empty(&lock->l_pending_chain)) {
231                 LDLM_DEBUG(lock, "not re-adding to wait list");
232                 spin_unlock_bh(&waiting_locks_spinlock);
233                 return 0;
234         }
235         LDLM_DEBUG(lock, "adding to wait list");
236
237         lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
238
239         timeout_rounded = round_timeout(lock->l_callback_timeout);
240
241         if (timeout_rounded < waiting_locks_timer.expires ||
242             !timer_pending(&waiting_locks_timer)) {
243                 mod_timer(&waiting_locks_timer, timeout_rounded);
244         }
245         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
246         spin_unlock_bh(&waiting_locks_spinlock);
247         return 1;
248 }
249
250 /*
251  * Remove a lock from the pending list, likely because it had its cancellation
252  * callback arrive without incident.  This adjusts the lock-timeout timer if
253  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
254  */
255 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
256 {
257         struct list_head *list_next;
258
259         if (lock->l_export == NULL) {
260                 /* We don't have a "waiting locks list" on clients. */
261                 LDLM_DEBUG(lock, "client lock: no-op");
262                 return 0;
263         }
264
265         spin_lock_bh(&waiting_locks_spinlock);
266
267         if (list_empty(&lock->l_pending_chain)) {
268                 spin_unlock_bh(&waiting_locks_spinlock);
269                 LDLM_DEBUG(lock, "wasn't waiting");
270                 return 0;
271         }
272
273         list_next = lock->l_pending_chain.next;
274         if (lock->l_pending_chain.prev == &waiting_locks_list) {
275                 /* Removing the head of the list, adjust timer. */
276                 if (list_next == &waiting_locks_list) {
277                         /* No more, just cancel. */
278                         del_timer(&waiting_locks_timer);
279                 } else {
280                         struct ldlm_lock *next;
281                         next = list_entry(list_next, struct ldlm_lock,
282                                           l_pending_chain);
283                         mod_timer(&waiting_locks_timer,
284                                   round_timeout(next->l_callback_timeout));
285                 }
286         }
287
288         spin_lock_bh(&expired_lock_thread.elt_lock);
289         list_del_init(&lock->l_pending_chain);
290         spin_unlock_bh(&expired_lock_thread.elt_lock);
291
292         spin_unlock_bh(&waiting_locks_spinlock);
293         LDLM_DEBUG(lock, "removed");
294         return 1;
295 }
296
297 #else /* !__KERNEL__ */
298
299 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
300 {
301         RETURN(1);
302 }
303
304 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
305 {
306         RETURN(0);
307 }
308
309 #endif /* __KERNEL__ */
310
311 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,const char *ast_type)
312 {
313         struct ptlrpc_connection *conn = lock->l_export->exp_connection;
314         char str[PTL_NALFMT_SIZE];
315
316         LDLM_ERROR(lock, "%s AST failed (%d): evicting client %s@%s NID "LPX64
317                    " (%s)", ast_type, rc, lock->l_export->exp_client_uuid.uuid,
318                    conn->c_remote_uuid.uuid, conn->c_peer.peer_id.nid,
319                    ptlrpc_peernid2str(&conn->c_peer, str));
320
321         if (obd_dump_on_timeout)
322                 portals_debug_dumplog();
323         ptlrpc_fail_export(lock->l_export);
324 }
325
326 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
327                                  struct ptlrpc_request *req, int rc,
328                                  const char *ast_type)
329 {
330         struct ptlrpc_peer *peer = &req->rq_import->imp_connection->c_peer;
331         char str[PTL_NALFMT_SIZE];
332
333         if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
334                 LASSERT(lock->l_export);
335                 if (lock->l_export->exp_libclient) {
336                         LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
337                                    " timeout, just cancelling lock", ast_type,
338                                    ptlrpc_peernid2str(peer, str));
339                         ldlm_lock_cancel(lock);
340                         rc = -ERESTART;
341                 } else {
342                         l_lock(&lock->l_resource->lr_namespace->ns_lock);
343                         ldlm_del_waiting_lock(lock);
344                         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
345                         ldlm_failed_ast(lock, rc, ast_type);
346                 }
347         } else if (rc) {
348                 if (rc == -EINVAL)
349                         LDLM_DEBUG(lock, "client (nid %s) returned %d"
350                                    " from %s AST - normal race",
351                                    ptlrpc_peernid2str(peer, str),
352                                    req->rq_repmsg->status, ast_type);
353                 else
354                         LDLM_ERROR(lock, "client (nid %s) returned %d "
355                                    "from %s AST", ptlrpc_peernid2str(peer, str),
356                                    (req->rq_repmsg != NULL) ?
357                                    req->rq_repmsg->status : 0, ast_type);
358                 ldlm_lock_cancel(lock);
359                 /* Server-side AST functions are called from ldlm_reprocess_all,
360                  * which needs to be told to please restart its reprocessing. */
361                 rc = -ERESTART;
362         }
363
364         return rc;
365 }
366
367 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
368                              struct ldlm_lock_desc *desc,
369                              void *data, int flag)
370 {
371         struct ldlm_request *body;
372         struct ptlrpc_request *req;
373         int rc = 0, size = sizeof(*body);
374         ENTRY;
375
376         if (flag == LDLM_CB_CANCELING) {
377                 /* Don't need to do anything here. */
378                 RETURN(0);
379         }
380
381         LASSERT(lock);
382
383         l_lock(&lock->l_resource->lr_namespace->ns_lock);
384         if (lock->l_granted_mode != lock->l_req_mode) {
385                 /* this blocking AST will be communicated as part of the
386                  * completion AST instead */
387                 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
388                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
389                 RETURN(0);
390         }
391
392         if (lock->l_destroyed) {
393                 /* What's the point? */
394                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
395                 RETURN(0);
396         }
397
398 #if 0
399         if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
400                 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
401                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
402                 RETURN(-ETIMEDOUT);
403         }
404 #endif
405
406         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
407                               LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK,
408                               1, &size, NULL);
409         if (req == NULL) {
410                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
411                 RETURN(-ENOMEM);
412         }
413
414         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
415         memcpy(&body->lock_handle1, &lock->l_remote_handle,
416                sizeof(body->lock_handle1));
417         memcpy(&body->lock_desc, desc, sizeof(*desc));
418         body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
419
420         LDLM_DEBUG(lock, "server preparing blocking AST");
421         req->rq_replen = lustre_msg_size(0, NULL);
422
423         if (lock->l_granted_mode == lock->l_req_mode)
424                 ldlm_add_waiting_lock(lock);
425         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
426
427         req->rq_send_state = LUSTRE_IMP_FULL;
428         req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
429         rc = ptlrpc_queue_wait(req);
430         if (rc != 0)
431                 rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
432
433         ptlrpc_req_finished(req);
434
435         RETURN(rc);
436 }
437
438 /* XXX copied from ptlrpc/service.c */
439 static long timeval_sub(struct timeval *large, struct timeval *small)
440 {
441         return (large->tv_sec - small->tv_sec) * 1000000 +
442                 (large->tv_usec - small->tv_usec);
443 }
444
445 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
446 {
447         struct ldlm_request *body;
448         struct ptlrpc_request *req;
449         struct timeval granted_time;
450         long total_enqueue_wait;
451         int rc = 0, size[2] = {sizeof(*body)}, buffers = 1;
452         ENTRY;
453
454         LASSERT(lock != NULL);
455
456         do_gettimeofday(&granted_time);
457         total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
458
459         if (total_enqueue_wait / 1000000 > obd_timeout)
460                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
461
462         down(&lock->l_resource->lr_lvb_sem);
463         if (lock->l_resource->lr_lvb_len) {
464                 buffers = 2;
465                 size[1] = lock->l_resource->lr_lvb_len;
466         }
467         up(&lock->l_resource->lr_lvb_sem);
468         
469         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
470                               LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK,
471                               buffers, size, NULL);
472         if (req == NULL)
473                 RETURN(-ENOMEM);
474
475         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
476         memcpy(&body->lock_handle1, &lock->l_remote_handle,
477                sizeof(body->lock_handle1));
478         body->lock_flags = flags;
479         ldlm_lock2desc(lock, &body->lock_desc);
480
481         if (buffers == 2) {
482                 void *lvb;
483                 
484                 down(&lock->l_resource->lr_lvb_sem);
485                 lvb = lustre_msg_buf(req->rq_reqmsg, 1,
486                                      lock->l_resource->lr_lvb_len);
487
488                 memcpy(lvb, lock->l_resource->lr_lvb_data,
489                        lock->l_resource->lr_lvb_len);
490                 up(&lock->l_resource->lr_lvb_sem);
491         }
492
493         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
494                    total_enqueue_wait);
495         req->rq_replen = lustre_msg_size(0, NULL);
496
497         req->rq_send_state = LUSTRE_IMP_FULL;
498         req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
499
500         /* We only send real blocking ASTs after the lock is granted */
501         l_lock(&lock->l_resource->lr_namespace->ns_lock);
502         if (lock->l_flags & LDLM_FL_AST_SENT) {
503                 body->lock_flags |= LDLM_FL_AST_SENT;
504                 ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
505         }
506         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
507
508         rc = ptlrpc_queue_wait(req);
509         if (rc != 0)
510                 rc = ldlm_handle_ast_error(lock, req, rc, "completion");
511
512         ptlrpc_req_finished(req);
513
514         RETURN(rc);
515 }
516
517 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
518 {
519         struct ldlm_resource *res = lock->l_resource;
520         struct ldlm_request *body;
521         struct ptlrpc_request *req;
522         int rc = 0, size = sizeof(*body);
523         ENTRY;
524
525         LASSERT(lock != NULL);
526
527         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
528                               LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK,
529                               1, &size, NULL);
530         if (req == NULL)
531                 RETURN(-ENOMEM);
532
533         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
534         memcpy(&body->lock_handle1, &lock->l_remote_handle,
535                sizeof(body->lock_handle1));
536         ldlm_lock2desc(lock, &body->lock_desc);
537
538         down(&lock->l_resource->lr_lvb_sem);
539         size = lock->l_resource->lr_lvb_len;
540         up(&lock->l_resource->lr_lvb_sem);
541         req->rq_replen = lustre_msg_size(1, &size);
542
543         req->rq_send_state = LUSTRE_IMP_FULL;
544         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
545
546         rc = ptlrpc_queue_wait(req);
547         if (rc == -ELDLM_NO_LOCK_DATA)
548                 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
549         else if (rc != 0)
550                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
551         else
552                 rc = res->lr_namespace->ns_lvbo->lvbo_update
553                         (res, req->rq_repmsg, 0, 1);
554         ptlrpc_req_finished(req);
555         RETURN(rc);
556 }
557
558 static struct ldlm_lock *
559 find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl)
560 {
561         struct obd_device *obd = exp->exp_obd;
562         struct list_head *iter;
563
564         l_lock(&obd->obd_namespace->ns_lock);
565         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
566                 struct ldlm_lock *lock;
567                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
568                 if (lock->l_remote_handle.cookie == remote_hdl->cookie) {
569                         LDLM_LOCK_GET(lock);
570                         l_unlock(&obd->obd_namespace->ns_lock);
571                         return lock;
572                 }
573         }
574         l_unlock(&obd->obd_namespace->ns_lock);
575         return NULL;
576 }
577
578
579 int ldlm_handle_enqueue(struct ptlrpc_request *req,
580                         ldlm_completion_callback completion_callback,
581                         ldlm_blocking_callback blocking_callback,
582                         ldlm_glimpse_callback glimpse_callback)
583 {
584         struct obd_device *obddev = req->rq_export->exp_obd;
585         struct ldlm_reply *dlm_rep;
586         struct ldlm_request *dlm_req;
587         int rc = 0, size[2] = {sizeof(*dlm_rep)};
588         __u32 flags;
589         ldlm_error_t err = ELDLM_OK;
590         struct ldlm_lock *lock = NULL;
591         void *cookie = NULL;
592         ENTRY;
593
594         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
595
596         dlm_req = lustre_swab_reqbuf (req, MDS_REQ_INTENT_LOCKREQ_OFF,
597                                       sizeof (*dlm_req),
598                                       lustre_swab_ldlm_request);
599         if (dlm_req == NULL) {
600                 CERROR ("Can't unpack dlm_req\n");
601                 GOTO(out, rc = -EFAULT);
602         }
603
604         flags = dlm_req->lock_flags;
605
606         LASSERT(req->rq_export);
607
608         if (flags & LDLM_FL_REPLAY) {
609                 lock = find_existing_lock(req->rq_export,
610                                           &dlm_req->lock_handle1);
611                 if (lock != NULL) {
612                         DEBUG_REQ(D_HA, req, "found existing lock cookie "LPX64,
613                                   lock->l_handle.h_cookie);
614                         GOTO(existing_lock, rc = 0);
615                 }
616         }
617
618         /* The lock's callback data might be set in the policy function */
619         lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2,
620                                 dlm_req->lock_desc.l_resource.lr_name,
621                                 dlm_req->lock_desc.l_resource.lr_type,
622                                 dlm_req->lock_desc.l_req_mode,
623                                 blocking_callback, completion_callback,
624                                 glimpse_callback, NULL, 0);
625         if (!lock)
626                 GOTO(out, rc = -ENOMEM);
627
628         do_gettimeofday(&lock->l_enqueued_time);
629         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
630                sizeof(lock->l_remote_handle));
631         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
632
633         LASSERT(req->rq_export);
634         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
635         l_lock(&lock->l_resource->lr_namespace->ns_lock);
636         if (req->rq_export->exp_failed) {
637                 LDLM_ERROR(lock,"lock on destroyed export %p\n",req->rq_export);
638                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
639                 GOTO(out, err = -ENOTCONN);
640         }
641         lock->l_export = class_export_get(req->rq_export);
642
643         list_add(&lock->l_export_chain,
644                  &lock->l_export->exp_ldlm_data.led_held_locks);
645         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
646
647 existing_lock:
648
649         if (flags & LDLM_FL_HAS_INTENT) {
650                 /* In this case, the reply buffer is allocated deep in
651                  * local_lock_enqueue by the policy function. */
652                 cookie = req;
653         } else {
654                 int buffers = 1;
655                 down(&lock->l_resource->lr_lvb_sem);
656                 if (lock->l_resource->lr_lvb_len) {
657                         size[1] = lock->l_resource->lr_lvb_len;
658                         buffers = 2;
659                 }
660                 up(&lock->l_resource->lr_lvb_sem);
661                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
662                         GOTO(out, rc = -ENOMEM);
663
664                 rc = lustre_pack_reply(req, buffers, size, NULL);
665                 if (rc)
666                         GOTO(out, rc);
667         }
668
669         if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
670                 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
671                        sizeof(ldlm_policy_data_t));
672         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
673                 memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
674                        sizeof(lock->l_req_extent));
675
676         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
677         if (err)
678                 GOTO(out, err);
679
680         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
681         dlm_rep->lock_flags = flags;
682
683         ldlm_lock2desc(lock, &dlm_rep->lock_desc);
684         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
685
686         /* We never send a blocking AST until the lock is granted, but
687          * we can tell it right now */
688         l_lock(&lock->l_resource->lr_namespace->ns_lock);
689         if (lock->l_flags & LDLM_FL_AST_SENT) {
690                 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
691                 if (lock->l_granted_mode == lock->l_req_mode)
692                         ldlm_add_waiting_lock(lock);
693         }
694         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
695
696         EXIT;
697  out:
698         req->rq_status = err;
699         if (req->rq_reply_state == NULL) {
700                 err = lustre_pack_reply(req, 0, NULL, NULL);
701                 if (rc == 0)
702                         rc = err;
703                 req->rq_status = rc;
704         }
705
706         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
707          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
708         if (lock) {
709                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
710                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
711                            "(err=%d, rc=%d)", err, rc);
712                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
713
714                 if (rc == 0) {
715                         down(&lock->l_resource->lr_lvb_sem);
716                         size[1] = lock->l_resource->lr_lvb_len;
717                         if (size[1] > 0) {
718                                 void *lvb = lustre_msg_buf(req->rq_repmsg,
719                                                            1, size[1]);
720                                 LASSERTF(lvb != NULL, "req %p, lock %p\n",
721                                          req, lock);
722
723                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
724                                        size[1]);
725                         }
726                         up(&lock->l_resource->lr_lvb_sem);
727                 } else {
728                         ldlm_lock_destroy(lock);
729                 }
730
731                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
732                         ldlm_reprocess_all(lock->l_resource);
733                 LDLM_LOCK_PUT(lock);
734         }
735         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
736                           lock, rc);
737
738         return rc;
739 }
740
741 int ldlm_handle_convert(struct ptlrpc_request *req)
742 {
743         struct ldlm_request *dlm_req;
744         struct ldlm_reply *dlm_rep;
745         struct ldlm_lock *lock;
746         int rc, size = sizeof(*dlm_rep);
747         ENTRY;
748
749         dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
750                                      lustre_swab_ldlm_request);
751         if (dlm_req == NULL) {
752                 CERROR ("Can't unpack dlm_req\n");
753                 RETURN (-EFAULT);
754         }
755
756         rc = lustre_pack_reply(req, 1, &size, NULL);
757         if (rc) {
758                 CERROR("out of memory\n");
759                 RETURN(-ENOMEM);
760         }
761         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
762         dlm_rep->lock_flags = dlm_req->lock_flags;
763
764         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
765         if (!lock) {
766                 req->rq_status = EINVAL;
767         } else {
768                 void *res = NULL;
769
770                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
771                 LDLM_DEBUG(lock, "server-side convert handler START");
772                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
773
774                 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
775                                         &dlm_rep->lock_flags);
776                 if (res) {
777                         l_lock(&lock->l_resource->lr_namespace->ns_lock);
778                         if (ldlm_del_waiting_lock(lock))
779                                 CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
780                         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
781                         req->rq_status = 0;
782                 } else {
783                         req->rq_status = EDEADLOCK;
784                 }
785         }
786
787         if (lock) {
788                 if (!req->rq_status)
789                         ldlm_reprocess_all(lock->l_resource);
790                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
791                 LDLM_DEBUG(lock, "server-side convert handler END");
792                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
793                 LDLM_LOCK_PUT(lock);
794         } else
795                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
796
797         RETURN(0);
798 }
799
800 int ldlm_handle_cancel(struct ptlrpc_request *req)
801 {
802         struct ldlm_request *dlm_req;
803         struct ldlm_lock *lock;
804         struct ldlm_resource *res;
805         int rc;
806         ENTRY;
807
808         dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
809                                       lustre_swab_ldlm_request);
810         if (dlm_req == NULL) {
811                 CERROR("bad request buffer for cancel\n");
812                 RETURN(-EFAULT);
813         }
814
815         rc = lustre_pack_reply(req, 0, NULL, NULL);
816         if (rc) {
817                 CERROR("out of memory\n");
818                 RETURN(-ENOMEM);
819         }
820
821         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
822         if (!lock) {
823                 CERROR("received cancel for unknown lock cookie "LPX64
824                        " from client %s id %s\n",
825                        dlm_req->lock_handle1.cookie,
826                        req->rq_export->exp_client_uuid.uuid,
827                        req->rq_peerstr);
828                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
829                                   "(cookie "LPU64")",
830                                   dlm_req->lock_handle1.cookie);
831                 req->rq_status = ESTALE;
832         } else {
833                 LDLM_DEBUG(lock, "server-side cancel handler START");
834                 res = lock->l_resource;
835                 if (res && res->lr_namespace->ns_lvbo &&
836                     res->lr_namespace->ns_lvbo->lvbo_update) {
837                         (void)res->lr_namespace->ns_lvbo->lvbo_update
838                                 (res, NULL, 0, 0);
839                                 //(res, req->rq_reqmsg, 1);
840                 }
841
842                 l_lock(&res->lr_namespace->ns_lock);
843                 ldlm_lock_cancel(lock);
844                 if (ldlm_del_waiting_lock(lock))
845                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
846                 l_unlock(&res->lr_namespace->ns_lock);
847                 req->rq_status = rc;
848         }
849
850         if (ptlrpc_reply(req) != 0)
851                 LBUG();
852
853         if (lock) {
854                 ldlm_reprocess_all(lock->l_resource);
855                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
856                 LDLM_DEBUG(lock, "server-side cancel handler END");
857                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
858                 LDLM_LOCK_PUT(lock);
859         }
860
861         RETURN(0);
862 }
863
864 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
865                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
866 {
867         int do_ast;
868         ENTRY;
869
870         l_lock(&ns->ns_lock);
871         LDLM_DEBUG(lock, "client blocking AST callback handler START");
872
873         lock->l_flags |= LDLM_FL_CBPENDING;
874         do_ast = (!lock->l_readers && !lock->l_writers);
875
876         if (do_ast) {
877                 LDLM_DEBUG(lock, "already unused, calling "
878                            "callback (%p)", lock->l_blocking_ast);
879                 if (lock->l_blocking_ast != NULL) {
880                         l_unlock(&ns->ns_lock);
881                         l_check_no_ns_lock(ns);
882                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
883                                              LDLM_CB_BLOCKING);
884                         l_lock(&ns->ns_lock);
885                 }
886         } else {
887                 LDLM_DEBUG(lock, "Lock still has references, will be"
888                            " cancelled later");
889         }
890
891         LDLM_DEBUG(lock, "client blocking callback handler END");
892         l_unlock(&ns->ns_lock);
893         LDLM_LOCK_PUT(lock);
894         EXIT;
895 }
896
897 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
898                                     struct ldlm_namespace *ns,
899                                     struct ldlm_request *dlm_req,
900                                     struct ldlm_lock *lock)
901 {
902         LIST_HEAD(ast_list);
903         ENTRY;
904
905         l_lock(&ns->ns_lock);
906         LDLM_DEBUG(lock, "client completion callback handler START");
907
908         /* If we receive the completion AST before the actual enqueue returned,
909          * then we might need to switch lock modes, resources, or extents. */
910         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
911                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
912                 LDLM_DEBUG(lock, "completion AST, new lock mode");
913         }
914
915         if (lock->l_resource->lr_type != LDLM_PLAIN) {
916                 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
917                        sizeof(lock->l_policy_data));
918                 LDLM_DEBUG(lock, "completion AST, new policy data");
919         }
920
921         ldlm_resource_unlink_lock(lock);
922         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
923                    &lock->l_resource->lr_name,
924                    sizeof(lock->l_resource->lr_name)) != 0) {
925                 ldlm_lock_change_resource(ns, lock,
926                                          dlm_req->lock_desc.l_resource.lr_name);
927                 LDLM_DEBUG(lock, "completion AST, new resource");
928         }
929
930         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
931                 lock->l_flags |= LDLM_FL_CBPENDING;
932                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
933         }
934
935         if (lock->l_lvb_len) {
936                 void *lvb;
937                 lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len,
938                                          lock->l_lvb_swabber);
939                 if (lvb == NULL) {
940                         LDLM_ERROR(lock, "completion AST did not contain "
941                                    "expected LVB!");
942                 } else {
943                         memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
944                 }
945         }
946
947         lock->l_resource->lr_tmp = &ast_list;
948         ldlm_grant_lock(lock, req, sizeof(*req), 1);
949         lock->l_resource->lr_tmp = NULL;
950         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
951         l_unlock(&ns->ns_lock);
952         LDLM_LOCK_PUT(lock);
953
954         ldlm_run_ast_work(ns, &ast_list);
955
956         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
957                           lock);
958         EXIT;
959 }
960
961 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
962                                     struct ldlm_namespace *ns,
963                                     struct ldlm_request *dlm_req,
964                                     struct ldlm_lock *lock)
965 {
966         int rc = -ENOSYS;
967         ENTRY;
968
969         l_lock(&ns->ns_lock);
970         LDLM_DEBUG(lock, "client glimpse AST callback handler");
971
972         if (lock->l_glimpse_ast != NULL) {
973                 l_unlock(&ns->ns_lock);
974                 l_check_no_ns_lock(ns);
975                 rc = lock->l_glimpse_ast(lock, req);
976                 l_lock(&ns->ns_lock);
977         }
978
979         if (req->rq_repmsg != NULL) {
980                 ptlrpc_reply(req);
981         } else {
982                 req->rq_status = rc;
983                 ptlrpc_error(req);
984         }
985
986         l_unlock(&ns->ns_lock);
987         if (lock->l_granted_mode == LCK_PW &&
988             !lock->l_readers && !lock->l_writers &&
989             time_after(jiffies, lock->l_last_used + 10 * HZ)) {
990                 if (ldlm_bl_to_thread(ns, NULL, lock))
991                         ldlm_handle_bl_callback(ns, NULL, lock);
992
993                 EXIT;
994                 return;
995         }
996         LDLM_LOCK_PUT(lock);
997         EXIT;
998 }
999
1000 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1001 {
1002         req->rq_status = rc;
1003         if (req->rq_reply_state == NULL) {
1004                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1005                 if (rc)
1006                         return rc;
1007         }
1008         return ptlrpc_reply(req);
1009 }
1010
1011 int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1012                       struct ldlm_lock *lock)
1013 {
1014 #ifdef __KERNEL__
1015         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1016         struct ldlm_bl_work_item *blwi;
1017         ENTRY;
1018
1019         OBD_ALLOC(blwi, sizeof(*blwi));
1020         if (blwi == NULL)
1021                 RETURN(-ENOMEM);
1022
1023         blwi->blwi_ns = ns;
1024         if (ld != NULL)
1025                 blwi->blwi_ld = *ld;
1026         blwi->blwi_lock = lock;
1027
1028         spin_lock(&blp->blp_lock);
1029         list_add_tail(&blwi->blwi_entry, &blp->blp_list);
1030         wake_up(&blp->blp_waitq);
1031         spin_unlock(&blp->blp_lock);
1032
1033         RETURN(0);
1034 #else
1035         RETURN(-ENOSYS);
1036 #endif
1037
1038 }
1039
1040 static int ldlm_msg_check_version(struct lustre_msg *msg)
1041 {
1042         int rc;
1043
1044         switch (msg->opc) {
1045         case LDLM_ENQUEUE:
1046         case LDLM_CONVERT:
1047         case LDLM_CANCEL:
1048         case LDLM_BL_CALLBACK:
1049         case LDLM_CP_CALLBACK:
1050         case LDLM_GL_CALLBACK:
1051                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1052                 if (rc)
1053                         CERROR("bad opc %u version %08x, expecting %08x\n",
1054                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
1055                 break;
1056         case OBD_LOG_CANCEL:
1057         case LLOG_ORIGIN_HANDLE_OPEN:
1058         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1059         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1060         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1061         case LLOG_ORIGIN_HANDLE_CLOSE:
1062         case LLOG_CATINFO:
1063                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1064                 if (rc)
1065                         CERROR("bad opc %u version %08x, expecting %08x\n",
1066                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
1067                 break;
1068         default:
1069                 CERROR("LDLM unknown opcode %d\n", msg->opc);
1070                 rc = -ENOTSUPP;
1071                 break;
1072         }
1073
1074         return rc;
1075 }
1076
1077 static int ldlm_callback_handler(struct ptlrpc_request *req)
1078 {
1079         struct ldlm_namespace *ns;
1080         struct ldlm_request *dlm_req;
1081         struct ldlm_lock *lock;
1082         int rc;
1083         ENTRY;
1084
1085         rc = ldlm_msg_check_version(req->rq_reqmsg);
1086         if (rc) {
1087                 CERROR("LDLM_CB drop mal-formed request\n");
1088                 RETURN(rc);
1089         }
1090
1091         /* Requests arrive in sender's byte order.  The ptlrpc service
1092          * handler has already checked and, if necessary, byte-swapped the
1093          * incoming request message body, but I am responsible for the
1094          * message buffers. */
1095
1096         if (req->rq_export == NULL) {
1097                 struct ldlm_request *dlm_req;
1098
1099                 CDEBUG(D_RPCTRACE, "operation %d from %s with bad "
1100                        "export cookie "LPX64"; this is "
1101                        "normal if this node rebooted with a lock held\n",
1102                        req->rq_reqmsg->opc,
1103                        req->rq_peerstr,
1104                        req->rq_reqmsg->handle.cookie);
1105                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
1106                                              lustre_swab_ldlm_request);
1107                 if (dlm_req != NULL)
1108                         CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
1109                                dlm_req->lock_handle1.cookie);
1110
1111                 ldlm_callback_reply(req, -ENOTCONN);
1112                 RETURN(0);
1113         }
1114
1115         LASSERT(req->rq_export != NULL);
1116         LASSERT(req->rq_export->exp_obd != NULL);
1117
1118         switch(req->rq_reqmsg->opc) {
1119         case LDLM_BL_CALLBACK:
1120                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1121                 break;
1122         case LDLM_CP_CALLBACK:
1123                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
1124                 break;
1125         case LDLM_GL_CALLBACK:
1126                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
1127                 break;
1128         case OBD_LOG_CANCEL:
1129                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1130                 rc = llog_origin_handle_cancel(req);
1131                 ldlm_callback_reply(req, rc);
1132                 RETURN(0);
1133         case LLOG_ORIGIN_HANDLE_OPEN:
1134                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1135                 rc = llog_origin_handle_open(req);
1136                 ldlm_callback_reply(req, rc);
1137                 RETURN(0);
1138         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1139                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1140                 rc = llog_origin_handle_next_block(req);
1141                 ldlm_callback_reply(req, rc);
1142                 RETURN(0);
1143         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1144                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1145                 rc = llog_origin_handle_prev_block(req);
1146                 ldlm_callback_reply(req, rc);
1147                 RETURN(0);
1148         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1149                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1150                 rc = llog_origin_handle_read_header(req);
1151                 ldlm_callback_reply(req, rc);
1152                 RETURN(0);
1153         case LLOG_ORIGIN_HANDLE_CLOSE:
1154                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1155                 rc = llog_origin_handle_close(req);
1156                 ldlm_callback_reply(req, rc);
1157                 RETURN(0);
1158         default:
1159                 CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
1160                 ldlm_callback_reply(req, -EPROTO);
1161                 RETURN(0);
1162         }
1163
1164         ns = req->rq_export->exp_obd->obd_namespace;
1165         LASSERT(ns != NULL);
1166
1167         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
1168                                       lustre_swab_ldlm_request);
1169         if (dlm_req == NULL) {
1170                 CERROR ("can't unpack dlm_req\n");
1171                 ldlm_callback_reply (req, -EPROTO);
1172                 RETURN (0);
1173         }
1174
1175         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
1176         if (!lock) {
1177                 LDLM_DEBUG_NOLOCK("callback on lock "LPX64" - lock "
1178                                   "disappeared\n",dlm_req->lock_handle1.cookie);
1179                 ldlm_callback_reply(req, -EINVAL);
1180                 RETURN(0);
1181         }
1182
1183         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1184         lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1185
1186         /* We want the ost thread to get this reply so that it can respond
1187          * to ost requests (write cache writeback) that might be triggered
1188          * in the callback.
1189          *
1190          * But we'd also like to be able to indicate in the reply that we're
1191          * cancelling right now, because it's unused, or have an intent result
1192          * in the reply, so we might have to push the responsibility for sending
1193          * the reply down into the AST handlers, alas. */
1194
1195         switch (req->rq_reqmsg->opc) {
1196         case LDLM_BL_CALLBACK:
1197                 CDEBUG(D_INODE, "blocking ast\n");
1198                 ldlm_callback_reply(req, 0);
1199                 if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock))
1200                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
1201
1202                 break;
1203         case LDLM_CP_CALLBACK:
1204                 CDEBUG(D_INODE, "completion ast\n");
1205                 ldlm_callback_reply(req, 0);
1206                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
1207                 break;
1208         case LDLM_GL_CALLBACK:
1209                 CDEBUG(D_INODE, "glimpse ast\n");
1210                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
1211                 break;
1212         default:
1213                 LBUG();                         /* checked above */
1214         }
1215
1216         RETURN(0);
1217 }
1218
1219 static int ldlm_cancel_handler(struct ptlrpc_request *req)
1220 {
1221         int rc = 0;
1222         ENTRY;
1223
1224         rc = ldlm_msg_check_version(req->rq_reqmsg);
1225         if (rc) {
1226                 CERROR("LDLM_CL drop mal-formed request\n");
1227                 RETURN(rc);
1228         }
1229
1230         /* Requests arrive in sender's byte order.  The ptlrpc service
1231          * handler has already checked and, if necessary, byte-swapped the
1232          * incoming request message body, but I am responsible for the
1233          * message buffers. */
1234
1235         if (req->rq_export == NULL) {
1236                 struct ldlm_request *dlm_req;
1237                 CERROR("operation %d with bad export from %s\n",
1238                        req->rq_reqmsg->opc,
1239                        req->rq_peerstr);
1240                 CERROR("--> export cookie: "LPX64"\n",
1241                        req->rq_reqmsg->handle.cookie);
1242                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
1243                                              lustre_swab_ldlm_request);
1244                 if (dlm_req != NULL)
1245                         ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
1246                 ldlm_callback_reply(req, -ENOTCONN);
1247                 RETURN(0);
1248         }
1249
1250         switch (req->rq_reqmsg->opc) {
1251
1252         /* XXX FIXME move this back to mds/handler.c, bug 249 */
1253         case LDLM_CANCEL:
1254                 CDEBUG(D_INODE, "cancel\n");
1255                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1256                 rc = ldlm_handle_cancel(req);
1257                 break;
1258         default:
1259                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
1260                 ldlm_callback_reply(req, -EINVAL);
1261         }
1262
1263         RETURN(rc);
1264 }
1265
1266 #ifdef __KERNEL__
1267 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
1268 {
1269         struct ldlm_bl_work_item *blwi = NULL;
1270
1271         spin_lock(&blp->blp_lock);
1272         if (!list_empty(&blp->blp_list)) {
1273                 blwi = list_entry(blp->blp_list.next, struct ldlm_bl_work_item,
1274                                   blwi_entry);
1275                 list_del(&blwi->blwi_entry);
1276         }
1277         spin_unlock(&blp->blp_lock);
1278
1279         return blwi;
1280 }
1281
1282 struct ldlm_bl_thread_data {
1283         int                     bltd_num;
1284         struct ldlm_bl_pool     *bltd_blp;
1285 };
1286
1287 static int ldlm_bl_thread_main(void *arg)
1288 {
1289         struct ldlm_bl_thread_data *bltd = arg;
1290         struct ldlm_bl_pool *blp = bltd->bltd_blp;
1291         unsigned long flags;
1292         ENTRY;
1293
1294         /* XXX boiler-plate */
1295         {
1296                 char name[sizeof(current->comm)];
1297                 snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
1298                          bltd->bltd_num);
1299                 kportal_daemonize(name);
1300         }
1301         SIGNAL_MASK_LOCK(current, flags);
1302         sigfillset(&current->blocked);
1303         RECALC_SIGPENDING;
1304         SIGNAL_MASK_UNLOCK(current, flags);
1305
1306         atomic_inc(&blp->blp_num_threads);
1307         complete(&blp->blp_comp);
1308
1309         while(1) {
1310                 struct l_wait_info lwi = { 0 };
1311                 struct ldlm_bl_work_item *blwi = NULL;
1312
1313                 l_wait_event_exclusive(blp->blp_waitq,
1314                                        (blwi = ldlm_bl_get_work(blp)) != NULL,
1315                                        &lwi);
1316
1317                 if (blwi->blwi_ns == NULL)
1318                         break;
1319
1320                 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
1321                                         blwi->blwi_lock);
1322                 OBD_FREE(blwi, sizeof(*blwi));
1323         }
1324
1325         atomic_dec(&blp->blp_num_threads);
1326         complete(&blp->blp_comp);
1327         RETURN(0);
1328 }
1329
1330 #endif
1331
1332 static int ldlm_setup(void);
1333 static int ldlm_cleanup(int force);
1334
1335 int ldlm_get_ref(void)
1336 {
1337         int rc = 0;
1338         down(&ldlm_ref_sem);
1339         if (++ldlm_refcount == 1) {
1340                 rc = ldlm_setup();
1341                 if (rc)
1342                         ldlm_refcount--;
1343         }
1344         up(&ldlm_ref_sem);
1345
1346         RETURN(rc);
1347 }
1348
1349 void ldlm_put_ref(int force)
1350 {
1351         down(&ldlm_ref_sem);
1352         if (ldlm_refcount == 1) {
1353                 int rc = ldlm_cleanup(force);
1354                 if (rc)
1355                         CERROR("ldlm_cleanup failed: %d\n", rc);
1356                 else
1357                         ldlm_refcount--;
1358         } else {
1359                 ldlm_refcount--;
1360         }
1361         up(&ldlm_ref_sem);
1362
1363         EXIT;
1364 }
1365
1366 static int ldlm_setup(void)
1367 {
1368         struct ldlm_bl_pool *blp;
1369         int rc = 0;
1370 #ifdef __KERNEL__
1371         int i;
1372 #endif
1373         ENTRY;
1374
1375         if (ldlm_state != NULL)
1376                 RETURN(-EALREADY);
1377
1378         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1379         if (ldlm_state == NULL)
1380                 RETURN(-ENOMEM);
1381
1382 #ifdef __KERNEL__
1383         rc = ldlm_proc_setup();
1384         if (rc != 0)
1385                 GOTO(out_free, rc);
1386 #endif
1387
1388         ldlm_state->ldlm_cb_service =
1389                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1390                                 LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1391                                 1500, ldlm_callback_handler, "ldlm_cbd",
1392                                 ldlm_svc_proc_dir);
1393
1394         if (!ldlm_state->ldlm_cb_service) {
1395                 CERROR("failed to start service\n");
1396                 GOTO(out_proc, rc = -ENOMEM);
1397         }
1398
1399         ldlm_state->ldlm_cancel_service =
1400                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1401                                 LDLM_CANCEL_REQUEST_PORTAL,
1402                                 LDLM_CANCEL_REPLY_PORTAL, 30000,
1403                                 ldlm_cancel_handler, "ldlm_canceld",
1404                                 ldlm_svc_proc_dir);
1405
1406         if (!ldlm_state->ldlm_cancel_service) {
1407                 CERROR("failed to start service\n");
1408                 GOTO(out_proc, rc = -ENOMEM);
1409         }
1410
1411         OBD_ALLOC(blp, sizeof(*blp));
1412         if (blp == NULL)
1413                 GOTO(out_proc, rc = -ENOMEM);
1414         ldlm_state->ldlm_bl_pool = blp;
1415
1416         atomic_set(&blp->blp_num_threads, 0);
1417         init_waitqueue_head(&blp->blp_waitq);
1418         spin_lock_init(&blp->blp_lock);
1419
1420         INIT_LIST_HEAD(&blp->blp_list);
1421
1422 #ifdef __KERNEL__
1423         for (i = 0; i < LDLM_NUM_THREADS; i++) {
1424                 struct ldlm_bl_thread_data bltd = {
1425                         .bltd_num = i,
1426                         .bltd_blp = blp,
1427                 };
1428                 init_completion(&blp->blp_comp);
1429                 rc = kernel_thread(ldlm_bl_thread_main, &bltd, 0);
1430                 if (rc < 0) {
1431                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
1432                         GOTO(out_thread, rc);
1433                 }
1434                 wait_for_completion(&blp->blp_comp);
1435         }
1436
1437         rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
1438                                     LDLM_NUM_THREADS, "ldlm_cn");
1439         if (rc)
1440                 GOTO(out_thread, rc);
1441
1442         rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
1443                                     LDLM_NUM_THREADS, "ldlm_cb");
1444         if (rc)
1445                 GOTO(out_thread, rc);
1446
1447         INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
1448         spin_lock_init(&expired_lock_thread.elt_lock);
1449         expired_lock_thread.elt_state = ELT_STOPPED;
1450         init_waitqueue_head(&expired_lock_thread.elt_waitq);
1451
1452         rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
1453         if (rc < 0) {
1454                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
1455                 GOTO(out_thread, rc);
1456         }
1457
1458         wait_event(expired_lock_thread.elt_waitq,
1459                    expired_lock_thread.elt_state == ELT_READY);
1460
1461         INIT_LIST_HEAD(&waiting_locks_list);
1462         spin_lock_init(&waiting_locks_spinlock);
1463         waiting_locks_timer.function = waiting_locks_callback;
1464         waiting_locks_timer.data = 0;
1465         init_timer(&waiting_locks_timer);
1466 #endif
1467
1468         RETURN(0);
1469
1470 #ifdef __KERNEL__
1471  out_thread:
1472         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1473         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1474 #endif
1475
1476  out_proc:
1477 #ifdef __KERNEL__
1478         ldlm_proc_cleanup();
1479  out_free:
1480 #endif
1481         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1482         ldlm_state = NULL;
1483         return rc;
1484 }
1485
1486 static int ldlm_cleanup(int force)
1487 {
1488 #ifdef __KERNEL__
1489         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1490 #endif
1491         ENTRY;
1492
1493         if (!list_empty(&ldlm_namespace_list)) {
1494                 CERROR("ldlm still has namespaces; clean these up first.\n");
1495                 ldlm_dump_all_namespaces(D_DLMTRACE);
1496                 RETURN(-EBUSY);
1497         }
1498
1499 #ifdef __KERNEL__
1500         while (atomic_read(&blp->blp_num_threads) > 0) {
1501                 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1502
1503                 init_completion(&blp->blp_comp);
1504
1505                 spin_lock(&blp->blp_lock);
1506                 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1507                 wake_up(&blp->blp_waitq);
1508                 spin_unlock(&blp->blp_lock);
1509
1510                 wait_for_completion(&blp->blp_comp);
1511         }
1512         OBD_FREE(blp, sizeof(*blp));
1513
1514         ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
1515         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1516         ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
1517         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1518         ldlm_proc_cleanup();
1519
1520         expired_lock_thread.elt_state = ELT_TERMINATE;
1521         wake_up(&expired_lock_thread.elt_waitq);
1522         wait_event(expired_lock_thread.elt_waitq,
1523                    expired_lock_thread.elt_state == ELT_STOPPED);
1524 #else
1525         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1526         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1527 #endif
1528
1529         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1530         ldlm_state = NULL;
1531
1532         RETURN(0);
1533 }
1534
1535 int __init ldlm_init(void)
1536 {
1537         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1538                                                sizeof(struct ldlm_resource), 0,
1539                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
1540         if (ldlm_resource_slab == NULL)
1541                 return -ENOMEM;
1542
1543         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1544                                            sizeof(struct ldlm_lock), 0,
1545                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
1546         if (ldlm_lock_slab == NULL) {
1547                 kmem_cache_destroy(ldlm_resource_slab);
1548                 return -ENOMEM;
1549         }
1550
1551         l_lock_init(&ldlm_handle_lock);
1552
1553         return 0;
1554 }
1555
1556 void __exit ldlm_exit(void)
1557 {
1558         if ( ldlm_refcount )
1559                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1560         LASSERTF(kmem_cache_destroy(ldlm_resource_slab) == 0,
1561                  "couldn't free ldlm resource slab\n");
1562         LASSERTF(kmem_cache_destroy(ldlm_lock_slab) == 0,
1563                  "couldn't free ldlm lock slab\n");
1564
1565 }
1566
1567 /* ldlm_flock.c */
1568 EXPORT_SYMBOL(ldlm_flock_completion_ast);
1569
1570 /* ldlm_extent.c */
1571 EXPORT_SYMBOL(ldlm_extent_shift_kms);
1572
1573 /* ldlm_lock.c */
1574 EXPORT_SYMBOL(ldlm_get_processing_policy);
1575 EXPORT_SYMBOL(ldlm_lock2desc);
1576 EXPORT_SYMBOL(ldlm_register_intent);
1577 EXPORT_SYMBOL(ldlm_lockname);
1578 EXPORT_SYMBOL(ldlm_typename);
1579 EXPORT_SYMBOL(ldlm_lock2handle);
1580 EXPORT_SYMBOL(__ldlm_handle2lock);
1581 EXPORT_SYMBOL(ldlm_lock_get);
1582 EXPORT_SYMBOL(ldlm_lock_put);
1583 EXPORT_SYMBOL(ldlm_lock_match);
1584 EXPORT_SYMBOL(ldlm_lock_cancel);
1585 EXPORT_SYMBOL(ldlm_lock_addref);
1586 EXPORT_SYMBOL(ldlm_lock_decref);
1587 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
1588 EXPORT_SYMBOL(ldlm_lock_change_resource);
1589 EXPORT_SYMBOL(ldlm_lock_set_data);
1590 EXPORT_SYMBOL(ldlm_it2str);
1591 EXPORT_SYMBOL(ldlm_lock_dump);
1592 EXPORT_SYMBOL(ldlm_lock_dump_handle);
1593 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
1594 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1595 EXPORT_SYMBOL(ldlm_lock_allow_match);
1596
1597 /* ldlm_request.c */
1598 EXPORT_SYMBOL(ldlm_completion_ast);
1599 EXPORT_SYMBOL(ldlm_expired_completion_wait);
1600 EXPORT_SYMBOL(ldlm_cli_convert);
1601 EXPORT_SYMBOL(ldlm_cli_enqueue);
1602 EXPORT_SYMBOL(ldlm_cli_cancel);
1603 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1604 EXPORT_SYMBOL(ldlm_replay_locks);
1605 EXPORT_SYMBOL(ldlm_resource_foreach);
1606 EXPORT_SYMBOL(ldlm_namespace_foreach);
1607 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
1608 EXPORT_SYMBOL(ldlm_change_cbdata);
1609
1610 /* ldlm_lockd.c */
1611 EXPORT_SYMBOL(ldlm_server_blocking_ast);
1612 EXPORT_SYMBOL(ldlm_server_completion_ast);
1613 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
1614 EXPORT_SYMBOL(ldlm_handle_enqueue);
1615 EXPORT_SYMBOL(ldlm_handle_cancel);
1616 EXPORT_SYMBOL(ldlm_handle_convert);
1617 EXPORT_SYMBOL(ldlm_del_waiting_lock);
1618 EXPORT_SYMBOL(ldlm_get_ref);
1619 EXPORT_SYMBOL(ldlm_put_ref);
1620
1621 #if 0
1622 /* ldlm_test.c */
1623 EXPORT_SYMBOL(ldlm_test);
1624 EXPORT_SYMBOL(ldlm_regression_start);
1625 EXPORT_SYMBOL(ldlm_regression_stop);
1626 #endif
1627
1628 /* ldlm_resource.c */
1629 EXPORT_SYMBOL(ldlm_namespace_new);
1630 EXPORT_SYMBOL(ldlm_namespace_cleanup);
1631 EXPORT_SYMBOL(ldlm_namespace_free);
1632 EXPORT_SYMBOL(ldlm_namespace_dump);
1633 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
1634 EXPORT_SYMBOL(ldlm_resource_get);
1635 EXPORT_SYMBOL(ldlm_resource_putref);
1636
1637 /* l_lock.c */
1638 EXPORT_SYMBOL(l_lock);
1639 EXPORT_SYMBOL(l_unlock);
1640
1641 /* ldlm_lib.c */
1642 EXPORT_SYMBOL(client_import_add_conn);
1643 EXPORT_SYMBOL(client_import_del_conn);
1644 EXPORT_SYMBOL(client_obd_setup);
1645 EXPORT_SYMBOL(client_obd_cleanup);
1646 EXPORT_SYMBOL(client_connect_import);
1647 EXPORT_SYMBOL(client_disconnect_export);
1648 EXPORT_SYMBOL(target_start_recovery_thread);
1649 EXPORT_SYMBOL(target_stop_recovery_thread);
1650 EXPORT_SYMBOL(target_handle_connect);
1651 EXPORT_SYMBOL(target_cleanup_recovery);
1652 EXPORT_SYMBOL(target_destroy_export);
1653 EXPORT_SYMBOL(target_cancel_recovery_timer);
1654 EXPORT_SYMBOL(target_send_reply);
1655 EXPORT_SYMBOL(target_queue_recovery_request);
1656 EXPORT_SYMBOL(target_handle_ping);
1657 EXPORT_SYMBOL(target_handle_disconnect);
1658 EXPORT_SYMBOL(target_queue_final_reply);