Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 # include <linux/slab.h>
30 # include <linux/init.h>
31 #else
32 # include <liblustre.h>
33 #endif
34
35 #include <linux/lustre_dlm.h>
36 #include <linux/obd_class.h>
37
38 extern kmem_cache_t *ldlm_resource_slab;
39 extern kmem_cache_t *ldlm_lock_slab;
40 extern struct lustre_lock ldlm_handle_lock;
41 extern struct list_head ldlm_namespace_list;
42 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
43 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
44
45 inline unsigned long round_timeout(unsigned long timeout)
46 {
47         return ((timeout / HZ) + 1) * HZ;
48 }
49
50 /* XXX should this be per-ldlm? */
51 static struct list_head waiting_locks_list;
52 static spinlock_t waiting_locks_spinlock;
53 static struct timer_list waiting_locks_timer;
54 static int ldlm_already_setup = 0;
55
56 static void waiting_locks_callback(unsigned long unused)
57 {
58         struct list_head *liter, *n;
59
60         spin_lock_bh(&waiting_locks_spinlock);
61         list_for_each_safe(liter, n, &waiting_locks_list) {
62                 struct ldlm_lock *l = list_entry(liter, struct ldlm_lock,
63                                                  l_pending_chain);
64                 if (l->l_callback_timeout > jiffies)
65                         break;
66                 CERROR("lock timer expired, lock %p\n", l);
67                 LDLM_DEBUG(l, "timer expired, recovering exp %p on conn %p",
68                            l->l_export, l->l_export->exp_connection);
69                 recovd_conn_fail(l->l_export->exp_connection);
70         }
71         spin_unlock_bh(&waiting_locks_spinlock);
72 }
73
74 /*
75  * Indicate that we're waiting for a client to call us back cancelling a given
76  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
77  * timer to fire appropriately.  (We round up to the next second, to avoid
78  * floods of timer firings during periods of high lock contention and traffic).
79  */
80 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
81 {
82         unsigned long timeout_rounded;
83         ENTRY;
84
85         LASSERT(list_empty(&lock->l_pending_chain));
86
87         spin_lock_bh(&waiting_locks_spinlock);
88         lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
89
90         timeout_rounded = round_timeout(lock->l_callback_timeout);
91
92         if (timeout_rounded < waiting_locks_timer.expires ||
93             !timer_pending(&waiting_locks_timer)) {
94                 mod_timer(&waiting_locks_timer, timeout_rounded);
95         }
96         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
97         spin_unlock_bh(&waiting_locks_spinlock);
98         RETURN(1);
99 }
100
101 /*
102  * Remove a lock from the pending list, likely because it had its cancellation
103  * callback arrive without incident.  This adjusts the lock-timeout timer if
104  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
105  */
106 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
107 {
108         struct list_head *list_next;
109
110         ENTRY;
111
112         spin_lock_bh(&waiting_locks_spinlock);
113
114         if (list_empty(&lock->l_pending_chain)) {
115                 spin_unlock_bh(&waiting_locks_spinlock);
116                 RETURN(0);
117         }
118
119         list_next = lock->l_pending_chain.next;
120         if (lock->l_pending_chain.prev == &waiting_locks_list) {
121                 /* Removing the head of the list, adjust timer. */
122                 if (list_next == &waiting_locks_list) {
123                         /* No more, just cancel. */
124                         del_timer(&waiting_locks_timer);
125                 } else {
126                         struct ldlm_lock *next;
127                         next = list_entry(list_next, struct ldlm_lock,
128                                           l_pending_chain);
129                         mod_timer(&waiting_locks_timer,
130                                   round_timeout(next->l_callback_timeout));
131                 }
132         }
133         list_del_init(&lock->l_pending_chain);
134         spin_unlock_bh(&waiting_locks_spinlock);
135         RETURN(1);
136 }
137
138 static inline void ldlm_failed_ast(struct ldlm_lock *lock)
139 {
140         /* XXX diagnostic */
141         recovd_conn_fail(lock->l_export->exp_connection);
142 }
143
144 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
145                              struct ldlm_lock_desc *desc,
146                              void *data, int flag)
147 {
148         struct ldlm_request *body;
149         struct ptlrpc_request *req;
150         int rc = 0, size = sizeof(*body);
151         ENTRY;
152
153         if (flag == LDLM_CB_CANCELING) {
154                 /* Don't need to do anything here. */
155                 RETURN(0);
156         }
157
158         LASSERT(lock);
159
160         l_lock(&lock->l_resource->lr_namespace->ns_lock);
161         /* XXX This is necessary because, with the lock re-tasking, we actually
162          * _can_ get called in here twice.  (bug 830) */
163         if (!list_empty(&lock->l_pending_chain)) {
164                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
165                 RETURN(0);
166         }
167
168         if (lock->l_destroyed) {
169                 /* What's the point? */
170                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
171                 RETURN(0);
172         }
173
174         req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
175                               LDLM_BL_CALLBACK, 1, &size, NULL);
176         if (!req)
177                 RETURN(-ENOMEM);
178
179         body = lustre_msg_buf(req->rq_reqmsg, 0);
180         memcpy(&body->lock_handle1, &lock->l_remote_handle,
181                sizeof(body->lock_handle1));
182         memcpy(&body->lock_desc, desc, sizeof(*desc));
183
184         LDLM_DEBUG(lock, "server preparing blocking AST");
185         req->rq_replen = lustre_msg_size(0, NULL);
186
187         ldlm_add_waiting_lock(lock);
188         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
189
190         req->rq_level = LUSTRE_CONN_RECOVD;
191         req->rq_timeout = 2;
192         rc = ptlrpc_queue_wait(req);
193         if (rc == -ETIMEDOUT || rc == -EINTR) {
194                 ldlm_del_waiting_lock(lock);
195                 ldlm_failed_ast(lock);
196         } else if (rc) {
197                 CERROR("client returned %d from blocking AST for lock %p\n",
198                        req->rq_status, lock);
199                 LDLM_DEBUG(lock, "client returned error %d from blocking AST",
200                            req->rq_status);
201                 ldlm_lock_cancel(lock);
202                 /* Server-side AST functions are called from ldlm_reprocess_all,
203                  * which needs to be told to please restart its reprocessing. */
204                 rc = -ERESTART;
205         }
206
207         ptlrpc_req_finished(req);
208
209         RETURN(rc);
210 }
211
212 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
213 {
214         struct ldlm_request *body;
215         struct ptlrpc_request *req;
216         int rc = 0, size = sizeof(*body);
217         ENTRY;
218
219         if (lock == NULL) {
220                 LBUG();
221                 RETURN(-EINVAL);
222         }
223
224         req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
225                               LDLM_CP_CALLBACK, 1, &size, NULL);
226         if (!req)
227                 RETURN(-ENOMEM);
228
229         body = lustre_msg_buf(req->rq_reqmsg, 0);
230         memcpy(&body->lock_handle1, &lock->l_remote_handle,
231                sizeof(body->lock_handle1));
232         body->lock_flags = flags;
233         ldlm_lock2desc(lock, &body->lock_desc);
234
235         LDLM_DEBUG(lock, "server preparing completion AST");
236         req->rq_replen = lustre_msg_size(0, NULL);
237
238         req->rq_level = LUSTRE_CONN_RECOVD;
239         req->rq_timeout = 2;
240         rc = ptlrpc_queue_wait(req);
241         if (rc == -ETIMEDOUT || rc == -EINTR) {
242                 ldlm_del_waiting_lock(lock);
243                 ldlm_failed_ast(lock);
244         } else if (rc) {
245                 CERROR("client returned %d from completion AST for lock %p\n",
246                        req->rq_status, lock);
247                 LDLM_DEBUG(lock, "client returned error %d from completion AST",
248                            req->rq_status);
249                 ldlm_lock_cancel(lock);
250                 /* Server-side AST functions are called from ldlm_reprocess_all,
251                  * which needs to be told to please restart its reprocessing. */
252                 rc = -ERESTART;
253         }
254         ptlrpc_req_finished(req);
255
256         RETURN(rc);
257 }
258
259 int ldlm_handle_enqueue(struct ptlrpc_request *req,
260                         ldlm_completion_callback completion_callback,
261                         ldlm_blocking_callback blocking_callback)
262 {
263         struct obd_device *obddev = req->rq_export->exp_obd;
264         struct ldlm_reply *dlm_rep;
265         struct ldlm_request *dlm_req;
266         int rc, size = sizeof(*dlm_rep), cookielen = 0;
267         __u32 flags;
268         ldlm_error_t err;
269         struct ldlm_lock *lock = NULL;
270         void *cookie = NULL;
271         ENTRY;
272
273         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
274
275         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
276         flags = dlm_req->lock_flags;
277         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
278             (flags & LDLM_FL_HAS_INTENT)) {
279                 /* In this case, the reply buffer is allocated deep in
280                  * local_lock_enqueue by the policy function. */
281                 cookie = req;
282                 cookielen = sizeof(*req);
283         } else {
284                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
285                                      &req->rq_repmsg);
286                 if (rc) {
287                         CERROR("out of memory\n");
288                         RETURN(-ENOMEM);
289                 }
290                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
291                         cookie = &dlm_req->lock_desc.l_extent;
292                         cookielen = sizeof(struct ldlm_extent);
293                 }
294         }
295
296         /* The lock's callback data might be set in the policy function */
297         lock = ldlm_lock_create(obddev->obd_namespace,
298                                 &dlm_req->lock_handle2,
299                                 dlm_req->lock_desc.l_resource.lr_name,
300                                 dlm_req->lock_desc.l_resource.lr_type,
301                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
302         if (!lock)
303                 GOTO(out, err = -ENOMEM);
304
305         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
306                sizeof(lock->l_remote_handle));
307         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
308
309         LASSERT(req->rq_export);
310         lock->l_export = req->rq_export;
311         l_lock(&lock->l_resource->lr_namespace->ns_lock);
312         list_add(&lock->l_export_chain,
313                  &lock->l_export->exp_ldlm_data.led_held_locks);
314         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
315
316         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
317                                 &flags, completion_callback, blocking_callback);
318         if (err)
319                 GOTO(out, err);
320
321         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
322         dlm_rep->lock_flags = flags;
323
324         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
325         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
326                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
327                        sizeof(lock->l_extent));
328         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
329                 memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
330                        sizeof(dlm_rep->lock_resource_name));
331                 dlm_rep->lock_mode = lock->l_req_mode;
332         }
333
334         EXIT;
335  out:
336         if (lock)
337                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
338                            "(err=%d)", err);
339         req->rq_status = err;
340
341         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
342          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
343         if (lock) {
344                 if (!err)
345                         ldlm_reprocess_all(lock->l_resource);
346                 LDLM_LOCK_PUT(lock);
347         }
348         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
349
350         return 0;
351 }
352
353 int ldlm_handle_convert(struct ptlrpc_request *req)
354 {
355         struct ldlm_request *dlm_req;
356         struct ldlm_reply *dlm_rep;
357         struct ldlm_lock *lock;
358         int rc, size = sizeof(*dlm_rep);
359         ENTRY;
360
361         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
362         if (rc) {
363                 CERROR("out of memory\n");
364                 RETURN(-ENOMEM);
365         }
366         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
367         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
368         dlm_rep->lock_flags = dlm_req->lock_flags;
369
370         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
371         if (!lock) {
372                 req->rq_status = EINVAL;
373         } else {
374                 LDLM_DEBUG(lock, "server-side convert handler START");
375                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
376                                   &dlm_rep->lock_flags);
377                 if (ldlm_del_waiting_lock(lock))
378                         CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
379                 req->rq_status = 0;
380         }
381
382         if (lock) {
383                 ldlm_reprocess_all(lock->l_resource);
384                 LDLM_DEBUG(lock, "server-side convert handler END");
385                 LDLM_LOCK_PUT(lock);
386         } else
387                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
388
389         RETURN(0);
390 }
391
392 int ldlm_handle_cancel(struct ptlrpc_request *req)
393 {
394         struct ldlm_request *dlm_req;
395         struct ldlm_lock *lock;
396         int rc;
397         ENTRY;
398
399         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
400         if (rc) {
401                 CERROR("out of memory\n");
402                 RETURN(-ENOMEM);
403         }
404         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
405         if (!dlm_req) {
406                 CERROR("bad request buffer for cancel\n");
407                 RETURN(-EINVAL);
408         }
409
410         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
411         if (!lock) {
412                 CERROR("received cancel for unknown lock cookie "LPX64"\n",
413                        dlm_req->lock_handle1.cookie);
414                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
415                                   "(cookie "LPU64")",
416                                   dlm_req->lock_handle1.cookie);
417                 req->rq_status = ESTALE;
418         } else {
419                 LDLM_DEBUG(lock, "server-side cancel handler START");
420                 ldlm_lock_cancel(lock);
421                 if (ldlm_del_waiting_lock(lock))
422                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
423                 req->rq_status = 0;
424         }
425
426         if (ptlrpc_reply(req->rq_svc, req) != 0)
427                 LBUG();
428
429         if (lock) {
430                 ldlm_reprocess_all(lock->l_resource);
431                 LDLM_DEBUG(lock, "server-side cancel handler END");
432                 LDLM_LOCK_PUT(lock);
433         }
434
435         RETURN(0);
436 }
437
438 static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
439                                     struct ldlm_namespace *ns,
440                                     struct ldlm_request *dlm_req,
441                                     struct ldlm_lock *lock)
442 {
443         int do_ast;
444         ENTRY;
445
446         /* Try to narrow down this damn iozone bug */
447         if (lock->l_resource == NULL)
448                 CERROR("lock %p resource NULL\n", lock);
449         if (lock->l_resource->lr_type != LDLM_EXTENT)
450                 if (lock->l_resource->lr_namespace != ns)
451                         CERROR("lock %p namespace %p != passed ns %p\n", lock,
452                                lock->l_resource->lr_namespace, ns);
453         LDLM_DEBUG(lock, "client blocking AST callback handler START");
454
455         l_lock(&ns->ns_lock);
456         lock->l_flags |= LDLM_FL_CBPENDING;
457         do_ast = (!lock->l_readers && !lock->l_writers);
458         l_unlock(&ns->ns_lock);
459
460         if (do_ast) {
461                 LDLM_DEBUG(lock, "already unused, calling "
462                            "callback (%p)", lock->l_blocking_ast);
463                 if (lock->l_blocking_ast != NULL)
464                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
465                                              lock->l_data, LDLM_CB_BLOCKING);
466         } else {
467                 LDLM_DEBUG(lock, "Lock still has references, will be"
468                            " cancelled later");
469         }
470
471         LDLM_DEBUG(lock, "client blocking callback handler END");
472         LDLM_LOCK_PUT(lock);
473         EXIT;
474 }
475
476 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
477                                     struct ldlm_namespace *ns,
478                                     struct ldlm_request *dlm_req,
479                                     struct ldlm_lock *lock)
480 {
481         LIST_HEAD(ast_list);
482         ENTRY;
483
484         LDLM_DEBUG(lock, "client completion callback handler START");
485
486         l_lock(&ns->ns_lock);
487
488         /* If we receive the completion AST before the actual enqueue returned,
489          * then we might need to switch lock modes, resources, or extents. */
490         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
491                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
492                 LDLM_DEBUG(lock, "completion AST, new lock mode");
493         }
494         if (lock->l_resource->lr_type == LDLM_EXTENT)
495                 memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
496                        sizeof(lock->l_extent));
497         ldlm_resource_unlink_lock(lock);
498         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
499                    &lock->l_resource->lr_name,
500                    sizeof(lock->l_resource->lr_name)) != 0) {
501                 ldlm_lock_change_resource(ns, lock,
502                                          dlm_req->lock_desc.l_resource.lr_name);
503                 LDLM_DEBUG(lock, "completion AST, new resource");
504         }
505         lock->l_resource->lr_tmp = &ast_list;
506         ldlm_grant_lock(lock, req, sizeof(*req));
507         lock->l_resource->lr_tmp = NULL;
508         l_unlock(&ns->ns_lock);
509         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
510         LDLM_LOCK_PUT(lock);
511
512         ldlm_run_ast_work(&ast_list);
513
514         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
515                           lock);
516         EXIT;
517 }
518
519 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
520 {
521         req->rq_status = rc;
522         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
523                              &req->rq_repmsg);
524         if (rc)
525                 return rc;
526         return ptlrpc_reply(req->rq_svc, req);
527 }
528
529 static int ldlm_callback_handler(struct ptlrpc_request *req)
530 {
531         struct ldlm_namespace *ns;
532         struct ldlm_request *dlm_req;
533         struct ldlm_lock *lock;
534         int rc;
535         ENTRY;
536
537         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
538         if (rc) {
539                 CERROR("Invalid request: %d\n", rc);
540                 RETURN(rc);
541         }
542
543         if (req->rq_export == NULL) {
544                 struct ldlm_request *dlm_req;
545
546                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
547                        req->rq_reqmsg->opc, req->rq_request_portal,
548                        req->rq_reply_portal);
549                 CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
550                        req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
551                 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
552                 CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
553                        dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
554                 ldlm_callback_reply(req, -ENOTCONN);
555                 RETURN(0);
556         }
557
558         if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
559                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
560         } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
561                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
562         } else {
563                 ldlm_callback_reply(req, -EIO);
564                 RETURN(0);
565         }
566
567         LASSERT(req->rq_export != NULL);
568         LASSERT(req->rq_export->exp_obd != NULL);
569         ns = req->rq_export->exp_obd->obd_namespace;
570         LASSERT(ns != NULL);
571
572         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
573         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
574         if (!lock) {
575                 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
576                        dlm_req->lock_handle1.cookie);
577                 ldlm_callback_reply(req, -EINVAL);
578                 RETURN(0);
579         }
580
581         /* we want the ost thread to get this reply so that it can respond
582          * to ost requests (write cache writeback) that might be triggered
583          * in the callback */
584         ldlm_callback_reply(req, 0);
585
586         switch (req->rq_reqmsg->opc) {
587         case LDLM_BL_CALLBACK:
588                 CDEBUG(D_INODE, "blocking ast\n");
589                 ldlm_handle_bl_callback(req, ns, dlm_req, lock);
590                 break;
591         case LDLM_CP_CALLBACK:
592                 CDEBUG(D_INODE, "completion ast\n");
593                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
594                 break;
595         }
596
597         RETURN(0);
598 }
599
600 static int ldlm_cancel_handler(struct ptlrpc_request *req)
601 {
602         int rc;
603         ENTRY;
604
605         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
606         if (rc) {
607                 CERROR("lustre_ldlm: Invalid request: %d\n", rc);
608                 RETURN(rc);
609         }
610
611         if (req->rq_export == NULL) {
612                 struct ldlm_request *dlm_req;
613                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
614                        req->rq_reqmsg->opc, req->rq_request_portal,
615                        req->rq_reply_portal);
616                 CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
617                        req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
618                 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
619                 ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
620                 RETURN(-ENOTCONN);
621         }
622
623         switch (req->rq_reqmsg->opc) {
624
625         /* XXX FIXME move this back to mds/handler.c, bug 625069 */
626         case LDLM_CANCEL:
627                 CDEBUG(D_INODE, "cancel\n");
628                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
629                 rc = ldlm_handle_cancel(req);
630                 if (rc)
631                         break;
632                 RETURN(0);
633
634         default:
635                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
636                 RETURN(-EINVAL);
637         }
638
639         RETURN(0);
640 }
641
642 static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
643                           void *karg, void *uarg)
644 {
645         struct obd_device *obddev = class_conn2obd(conn);
646         struct ptlrpc_connection *connection;
647         struct obd_uuid uuid = { "ldlm" };
648         int err = 0;
649         ENTRY;
650
651         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
652             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
653                 CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
654                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
655                 RETURN(-EINVAL);
656         }
657
658         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
659                   sizeof(*obddev->u.ldlm.ldlm_client));
660         connection = ptlrpc_uuid_to_connection(&uuid);
661         if (!connection)
662                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
663
664         switch (cmd) {
665         case IOC_LDLM_TEST:
666                 //err = ldlm_test(obddev, conn);
667                 err = 0;
668                 CERROR("-- NO TESTS WERE RUN done err %d\n", err);
669                 GOTO(out, err);
670         case IOC_LDLM_DUMP:
671                 ldlm_dump_all_namespaces();
672                 GOTO(out, err);
673         default:
674                 GOTO(out, err = -EINVAL);
675         }
676
677  out:
678         if (connection)
679                 ptlrpc_put_connection(connection);
680         OBD_FREE(obddev->u.ldlm.ldlm_client,
681                  sizeof(*obddev->u.ldlm.ldlm_client));
682         return err;
683 }
684
685 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
686 {
687         struct ldlm_obd *ldlm = &obddev->u.ldlm;
688         int rc, i;
689         ENTRY;
690
691         if (ldlm_already_setup)
692                 RETURN(-EALREADY);
693
694         rc = ldlm_proc_setup(obddev);
695         if (rc != 0)
696                 RETURN(rc);
697
698 #ifdef __KERNEL__
699         ldlm->ldlm_cb_service =
700                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
701                                 LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
702                                 LDLM_CB_REPLY_PORTAL,
703                                 ldlm_callback_handler, "ldlm_cbd");
704
705         if (!ldlm->ldlm_cb_service) {
706                 CERROR("failed to start service\n");
707                 GOTO(out_proc, rc = -ENOMEM);
708         }
709
710         ldlm->ldlm_cancel_service =
711                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
712                                 LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
713                                 LDLM_CANCEL_REPLY_PORTAL,
714                                 ldlm_cancel_handler, "ldlm_canceld");
715
716         if (!ldlm->ldlm_cancel_service) {
717                 CERROR("failed to start service\n");
718                 GOTO(out_proc, rc = -ENOMEM);
719         }
720
721         for (i = 0; i < LDLM_NUM_THREADS; i++) {
722                 char name[32];
723                 sprintf(name, "ldlm_cn_%02d", i);
724                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cancel_service,
725                                          name);
726                 if (rc) {
727                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
728                         LBUG();
729                         GOTO(out_thread, rc);
730                 }
731         }
732
733         for (i = 0; i < LDLM_NUM_THREADS; i++) {
734                 char name[32];
735                 sprintf(name, "ldlm_cb_%02d", i);
736                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cb_service, name);
737                 if (rc) {
738                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
739                         LBUG();
740                         GOTO(out_thread, rc);
741                 }
742         }
743
744 #endif
745         INIT_LIST_HEAD(&waiting_locks_list);
746         spin_lock_init(&waiting_locks_spinlock);
747         waiting_locks_timer.function = waiting_locks_callback;
748         waiting_locks_timer.data = 0;
749         init_timer(&waiting_locks_timer);
750
751         ldlm_already_setup = 1;
752
753         RETURN(0);
754
755  out_thread:
756 #ifdef __KERNEL__
757         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
758         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
759         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
760         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
761 #endif
762  out_proc:
763         ldlm_proc_cleanup(obddev);
764
765         return rc;
766 }
767
768 static int ldlm_cleanup(struct obd_device *obddev)
769 {
770         struct ldlm_obd *ldlm = &obddev->u.ldlm;
771         ENTRY;
772
773         if (!list_empty(&ldlm_namespace_list)) {
774                 CERROR("ldlm still has namespaces; clean these up first.\n");
775                 RETURN(-EBUSY);
776         }
777
778 #ifdef __KERNEL__
779         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
780         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
781         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
782         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
783         ldlm_proc_cleanup(obddev);
784 #endif
785         ldlm_already_setup = 0;
786         RETURN(0);
787 }
788
789 static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
790                         struct obd_uuid *cluuid, struct recovd_obd *recovd,
791                         ptlrpc_recovery_cb_t recover)
792 {
793         return class_connect(conn, src, cluuid);
794 }
795
796 struct obd_ops ldlm_obd_ops = {
797         o_owner:       THIS_MODULE,
798         o_iocontrol:   ldlm_iocontrol,
799         o_setup:       ldlm_setup,
800         o_cleanup:     ldlm_cleanup,
801         o_connect:     ldlm_connect,
802         o_disconnect:  class_disconnect
803 };
804
805 int __init ldlm_init(void)
806 {
807         int rc = class_register_type(&ldlm_obd_ops, 0, OBD_LDLM_DEVICENAME);
808         if (rc != 0)
809                 return rc;
810
811         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
812                                                sizeof(struct ldlm_resource), 0,
813                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
814         if (ldlm_resource_slab == NULL)
815                 return -ENOMEM;
816
817         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
818                                            sizeof(struct ldlm_lock), 0,
819                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
820         if (ldlm_lock_slab == NULL) {
821                 kmem_cache_destroy(ldlm_resource_slab);
822                 return -ENOMEM;
823         }
824
825         l_lock_init(&ldlm_handle_lock);
826
827         return 0;
828 }
829
830 static void __exit ldlm_exit(void)
831 {
832         class_unregister_type(OBD_LDLM_DEVICENAME);
833         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
834                 CERROR("couldn't free ldlm resource slab\n");
835         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
836                 CERROR("couldn't free ldlm lock slab\n");
837 }
838
839 /* ldlm_lock.c */
840 EXPORT_SYMBOL(ldlm_lock2desc);
841 EXPORT_SYMBOL(ldlm_register_intent);
842 EXPORT_SYMBOL(ldlm_unregister_intent);
843 EXPORT_SYMBOL(ldlm_lockname);
844 EXPORT_SYMBOL(ldlm_typename);
845 EXPORT_SYMBOL(ldlm_lock2handle);
846 EXPORT_SYMBOL(__ldlm_handle2lock);
847 EXPORT_SYMBOL(ldlm_lock_put);
848 EXPORT_SYMBOL(ldlm_lock_match);
849 EXPORT_SYMBOL(ldlm_lock_cancel);
850 EXPORT_SYMBOL(ldlm_lock_addref);
851 EXPORT_SYMBOL(ldlm_lock_decref);
852 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
853 EXPORT_SYMBOL(ldlm_lock_change_resource);
854 EXPORT_SYMBOL(ldlm_lock_set_data);
855 EXPORT_SYMBOL(ldlm_it2str);
856 EXPORT_SYMBOL(ldlm_lock_dump);
857 EXPORT_SYMBOL(ldlm_lock_dump_handle);
858 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
859 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
860
861 /* ldlm_request.c */
862 EXPORT_SYMBOL(ldlm_completion_ast);
863 EXPORT_SYMBOL(ldlm_expired_completion_wait);
864 EXPORT_SYMBOL(ldlm_cli_convert);
865 EXPORT_SYMBOL(ldlm_cli_enqueue);
866 EXPORT_SYMBOL(ldlm_cli_cancel);
867 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
868 EXPORT_SYMBOL(ldlm_match_or_enqueue);
869 EXPORT_SYMBOL(ldlm_replay_locks);
870 EXPORT_SYMBOL(ldlm_resource_foreach);
871 EXPORT_SYMBOL(ldlm_namespace_foreach);
872 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
873
874 /* ldlm_lockd.c */
875 EXPORT_SYMBOL(ldlm_server_blocking_ast);
876 EXPORT_SYMBOL(ldlm_server_completion_ast);
877 EXPORT_SYMBOL(ldlm_handle_enqueue);
878 EXPORT_SYMBOL(ldlm_handle_cancel);
879 EXPORT_SYMBOL(ldlm_handle_convert);
880 EXPORT_SYMBOL(ldlm_del_waiting_lock);
881
882 #if 0
883 /* ldlm_test.c */
884 EXPORT_SYMBOL(ldlm_test);
885 EXPORT_SYMBOL(ldlm_regression_start);
886 EXPORT_SYMBOL(ldlm_regression_stop);
887 #endif
888
889 /* ldlm_resource.c */
890 EXPORT_SYMBOL(ldlm_namespace_new);
891 EXPORT_SYMBOL(ldlm_namespace_cleanup);
892 EXPORT_SYMBOL(ldlm_namespace_free);
893 EXPORT_SYMBOL(ldlm_namespace_dump);
894
895 /* l_lock.c */
896 EXPORT_SYMBOL(l_lock);
897 EXPORT_SYMBOL(l_unlock);
898
899 #ifdef __KERNEL__
900 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
901 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
902 MODULE_LICENSE("GPL");
903
904 module_init(ldlm_init);
905 module_exit(ldlm_exit);
906 #endif