Whamcloud - gitweb
Use debugging macros to aid in tracing.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 50d0a77..72a0622 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_LDLM
 
-#include <linux/obd_class.h>
 #include <linux/lustre_dlm.h>
-#include <linux/lustre_net.h>
 
 extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 
-static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                                void *data)
+static int _ldlm_namespace_new(struct obd_device *obddev,
+                               struct ptlrpc_request *req)
 {
-        LBUG();
-        return 0;
+        struct ldlm_request *dlm_req;
+        struct ldlm_namespace *ns;
+        int rc;
+        ldlm_error_t err;
+        ENTRY;
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc) {
+                CERROR("out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
+                                 &ns);
+        req->rq_status = err;
+
+        CERROR("err = %d\n", err);
+
+        RETURN(0);
 }
 
-static int ldlm_enqueue(struct ptlrpc_request *req)
+static int _ldlm_enqueue(struct ptlrpc_request *req)
 {
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
-        struct lustre_msg *msg, *req_msg;
+        int rc, size = sizeof(*dlm_rep);
         ldlm_error_t err;
-        int rc;
-        int bufsize = sizeof(*dlm_rep);
-        
-        rc = lustre_pack_msg(1, &bufsize, NULL, &req->rq_replen,
-                             &req->rq_repbuf);
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc) {
                 CERROR("out of memory\n");
                 req->rq_status = -ENOMEM;
                 RETURN(0);
         }
-        msg = (struct lustre_msg *)req->rq_repbuf;
-        req_msg = req->rq_req.lustre;
-        dlm_rep = lustre_msg_buf(0, msg);
-        dlm_req = lustre_msg_buf(0, req_msg);
-
-        msg->xid = req_msg->xid;
-
-        err = ldlm_local_lock_enqueue(req->rq_obd, dlm_req->ns_id,
-                                      &dlm_req->parent_res_handle,
-                                      &dlm_req->parent_lock_handle,
-                                      dlm_req->res_id, dlm_req->mode,
-                                      &dlm_req->flags, ldlm_client_callback,
-                                      ldlm_client_callback,
-                                      req_msg->buflens[1],
-                                      lustre_msg_buf(1, req_msg),
-                                      &dlm_rep->lock_handle);
-        msg->status = HTON__u32(err);
-
-        /* XXX unfinished */
+        dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
+               sizeof(dlm_rep->lock_extent));
+        dlm_rep->flags = dlm_req->flags;
+
+        err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id,
+                                     &dlm_req->lock_handle2,
+                                     dlm_req->lock_desc.l_resource.lr_name,
+                                     dlm_req->lock_desc.l_resource.lr_type,
+                                     &dlm_rep->lock_handle);
+        if (err != ELDLM_OK)
+                GOTO(out, err);
+
+        err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle,
+                                      dlm_req->lock_desc.l_req_mode,
+                                      &dlm_rep->lock_extent,
+                                      &dlm_rep->flags,
+                                      ldlm_cli_callback,
+                                      ldlm_cli_callback,
+                                      lustre_msg_buf(req->rq_reqmsg, 1),
+                                      req->rq_reqmsg->buflens[1]);
+        if (err != ELDLM_OK)
+                GOTO(out, err);
+
+        lock = ldlm_handle2object(&dlm_rep->lock_handle);
+        memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
+               sizeof(lock->l_remote_handle));
+        lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
+        EXIT;
+ out:
+        req->rq_status = err;
+        CERROR("err = %d\n", err);
+
         return 0;
 }
 
-static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
-                       struct ptlrpc_request *req)
+static int _ldlm_convert(struct ptlrpc_request *req)
 {
+        struct ldlm_request *dlm_req;
         int rc;
-        struct ptlreq_hdr *hdr;
+        ENTRY;
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc) {
+                CERROR("out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
+        req->rq_status =
+                ldlm_local_lock_convert(&dlm_req->lock_handle1,
+                                        dlm_req->lock_desc.l_req_mode,
+                                        &dlm_req->flags);
+        RETURN(0);
+}
+
+static int _ldlm_cancel(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        int rc;
         ENTRY;
 
-        hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc) {
+                CERROR("out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) {
-                CERROR("lustre_ldlm: wrong packet type sent %d\n",
-                       NTOH__u32(hdr->type));
-                rc = -EINVAL;
-                GOTO(out, rc);
+        req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1);
+        RETURN(0);
+}
+
+static int _ldlm_callback(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        struct ldlm_lock *lock;
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc) {
+                CERROR("out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
         }
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        lock = ldlm_handle2object(&dlm_req->lock_handle1);
+        ldlm_lock_dump(lock);
+        if (dlm_req->lock_handle2.addr) {
+                CERROR("Got blocked callback for lock %p.\n", lock);
+                /* FIXME: do something impressive. */
+        } else {
+                CERROR("Got granted callback for lock %p.\n", lock);
+                lock->l_granted_mode = lock->l_req_mode;
+                wake_up(&lock->l_waitq);
+        }
+
+        req->rq_status = 0;
 
-        rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
-        req->rq_reqhdr = (void *)req->rq_reqbuf;
+        RETURN(0);
+}
+
+static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
+                       struct ptlrpc_request *req)
+{
+        int rc;
+        ENTRY;
+
+        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
         if (rc) {
                 CERROR("lustre_ldlm: Invalid request\n");
                 GOTO(out, rc);
         }
 
-        switch (req->rq_reqhdr->opc) {
+        if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
+                CERROR("lustre_ldlm: wrong packet type sent %d\n",
+                       req->rq_reqmsg->type);
+                GOTO(out, rc = -EINVAL);
+        }
+
+        switch (req->rq_reqmsg->opc) {
+        case LDLM_NAMESPACE_NEW:
+                CDEBUG(D_INODE, "namespace_new\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
+                rc = _ldlm_namespace_new(dev, req);
+                break;
+
         case LDLM_ENQUEUE:
                 CDEBUG(D_INODE, "enqueue\n");
-                OBD_FAIL_RETURN(req, OBD_FAIL_LDLM_ENQUEUE);
-                rc = ldlm_enqueue(req);
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+                rc = _ldlm_enqueue(req);
                 break;
-#if 0
+
         case LDLM_CONVERT:
                 CDEBUG(D_INODE, "convert\n");
-                OBD_FAIL_RETURN(req, OBD_FAIL_LDLM_CONVERT);
-                rc = ldlm_convert(req);
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+                rc = _ldlm_convert(req);
                 break;
 
         case LDLM_CANCEL:
                 CDEBUG(D_INODE, "cancel\n");
-                OBD_FAIL_RETURN(req, OBD_FAIL_LDLM_CANCEL);
-                rc = ldlm_cancel(req);
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
+                rc = _ldlm_cancel(req);
                 break;
 
         case LDLM_CALLBACK:
                 CDEBUG(D_INODE, "callback\n");
-                OBD_FAIL_RETURN(req, OBD_FAIL_LDLM_CALLBACK);
-                rc = ldlm_callback(req);
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
+                rc = _ldlm_callback(req);
                 break;
-#endif
 
         default:
-                rc = ptlrpc_error(dev, svc, req);
+                rc = ptlrpc_error(svc, req);
                 RETURN(rc);
         }
 
-        EXIT;
 out:
-        if (rc) {
-                CERROR("no header\n");
-                return 0;
-        }
-
-        if( req->rq_status) {
-                ptlrpc_error(dev, svc, req);
-        } else {
-                CDEBUG(D_NET, "sending reply\n");
-                ptlrpc_reply(dev, svc, req);
-        }
-
-        return 0;
+        if (rc)
+                RETURN(ptlrpc_error(svc, req));
+        else
+                RETURN(ptlrpc_reply(svc, req));
 }
 
-
-
 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
                           void *uarg)
 {
         struct obd_device *obddev = conn->oc_dev;
+        struct ptlrpc_connection *connection;
         int err;
-
         ENTRY;
 
-        if ( _IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
-             _IOC_NR(cmd) < IOC_LDLM_MIN_NR  ||
-             _IOC_NR(cmd) > IOC_LDLM_MAX_NR ) {
+        if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
+            _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
-                                _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
-                EXIT;
-                return -EINVAL;
+                       _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
+                RETURN(-EINVAL);
         }
 
+        ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           obddev->u.ldlm.ldlm_client);
+        connection = ptlrpc_uuid_to_connection("ldlm");
+        if (!connection)
+                CERROR("No LDLM UUID found: assuming ldlm is local.\n");
+
         switch (cmd) {
         case IOC_LDLM_TEST: {
-                err = ldlm_test(obddev);
+                err = ldlm_test(obddev, connection);
                 CERROR("-- done err %d\n", err);
-                EXIT;
-                break;
+                GOTO(out, err);
         }
         default:
-                err = -EINVAL;
-                EXIT;
-                break;
+                GOTO(out, err = -EINVAL);
         }
 
+ out:
+        if (connection)
+                ptlrpc_put_connection(connection);
         return err;
 }
 
@@ -183,24 +276,97 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
         int err;
         ENTRY;
 
-        INIT_LIST_HEAD(&obddev->u.ldlm.ldlm_namespaces);
-        obddev->u.ldlm.ldlm_lock = SPIN_LOCK_UNLOCKED;
+        ldlm_spinlock = SPIN_LOCK_UNLOCKED;
 
-        ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024,
-                                             LDLM_REQUEST_PORTAL,
-                                             LDLM_REPLY_PORTAL,
-                                             "self", ldlm_handle);
-
-        rpc_register_service(ldlm->ldlm_service, "self");
+        ldlm->ldlm_service =
+                ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
+                                LDLM_REPLY_PORTAL, "self", ldlm_handle);
+        if (!ldlm->ldlm_service)
+                LBUG();
 
         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
-        if (err)
+        if (err) {
                 CERROR("cannot start thread\n");
+                LBUG();
+        }
+
+        OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
+        if (ldlm->ldlm_client == NULL)
+                LBUG();
+        ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           ldlm->ldlm_client);
 
         MOD_INC_USE_COUNT;
         RETURN(0);
 }
 
+static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
+{
+        struct list_head *tmp, *pos;
+        int rc = 0;
+
+        list_for_each_safe(tmp, pos, q) {
+                struct ldlm_lock *lock;
+
+                if (rc) {
+                        /* Res was already cleaned up. */
+                        LBUG();
+                }
+
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                ldlm_resource_del_lock(lock);
+                ldlm_lock_free(lock);
+                rc = ldlm_resource_put(res);
+        }
+
+        return rc;
+}
+
+static int do_free_namespace(struct ldlm_namespace *ns)
+{
+        struct list_head *tmp, *pos;
+        int i, rc;
+
+        for (i = 0; i < RES_HASH_SIZE; i++) {
+                list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+                        struct ldlm_resource *res;
+                        res = list_entry(tmp, struct ldlm_resource, lr_hash);
+                        list_del_init(&res->lr_hash);
+
+                        rc = cleanup_resource(res, &res->lr_granted);
+                        if (!rc)
+                                rc = cleanup_resource(res, &res->lr_converting);
+                        if (!rc)
+                                rc = cleanup_resource(res, &res->lr_waiting);
+
+                        while (rc == 0)
+                                rc = ldlm_resource_put(res);
+                }
+        }
+
+        return ldlm_namespace_free(ns);
+}
+
+static int ldlm_free_all(struct obd_device *obddev)
+{
+        struct list_head *tmp, *pos;
+        int rc = 0;
+
+        ldlm_lock();
+
+        list_for_each_safe(tmp, pos, &ldlm_namespaces) {
+                struct ldlm_namespace *ns;
+                ns = list_entry(tmp, struct ldlm_namespace, ns_link);
+
+                rc |= do_free_namespace(ns);
+        }
+
+        ldlm_unlock();
+
+        return rc;
+}
+
 static int ldlm_cleanup(struct obd_device *obddev)
 {
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
@@ -214,9 +380,14 @@ static int ldlm_cleanup(struct obd_device *obddev)
                 CERROR("Request list not empty!\n");
         }
 
-        rpc_unregister_service(ldlm->ldlm_service);
+        OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
 
+        if (ldlm_free_all(obddev)) {
+                CERROR("ldlm_free_all could not complete.\n");
+                RETURN(-1);
+        }
+
         MOD_DEC_USE_COUNT;
         RETURN(0);
 }
@@ -262,7 +433,7 @@ static void __exit ldlm_exit(void)
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
-MODULE_LICENSE("GPL"); 
+MODULE_LICENSE("GPL");
 
 module_init(ldlm_init);
 module_exit(ldlm_exit);