Whamcloud - gitweb
merge from b_devel -> b_ad
authorrread <rread>
Thu, 5 Jun 2003 04:12:30 +0000 (04:12 +0000)
committerrread <rread>
Thu, 5 Jun 2003 04:12:30 +0000 (04:12 +0000)
  DEVEL_AD_PARENT_20030526 -> DEVEL_AD_PARENT_20030605

Large, squirrelly merge. It mounts, and acceptance_small.sh finishes
the test and then fails in cleanup, just like b_devel.

lustre/ldlm/ldlm_lib.c

index b537ebe..2616392 100644 (file)
 #include <linux/obd_ost.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_mds.h>
+#include <linux/lustre_net.h>
 
-int ptlrpc_import_connect(struct lustre_handle *conn, struct obd_device *obd,
+int client_import_connect(struct lustre_handle *dlm_handle, 
+                          struct obd_device *obd,
                           struct obd_uuid *cluuid)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -41,16 +43,16 @@ int ptlrpc_import_connect(struct lustre_handle *conn, struct obd_device *obd,
         /* XXX maybe this is a good time to create a connect struct? */
         int rc, size[] = {sizeof(imp->imp_target_uuid),
                           sizeof(obd->obd_uuid),
-                          sizeof(*conn)};
+                          sizeof(*dlm_handle)};
         char *tmp[] = {imp->imp_target_uuid.uuid,
                        obd->obd_uuid.uuid,
-                       (char *)conn};
+                       (char *)dlm_handle};
         int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT;
         int msg_flags;
 
         ENTRY;
         down(&cli->cl_sem);
-        rc = class_connect(conn, obd, cluuid);
+        rc = class_connect(dlm_handle, obd, cluuid);
         if (rc)
                 GOTO(out_sem, rc);
 
@@ -72,17 +74,19 @@ int ptlrpc_import_connect(struct lustre_handle *conn, struct obd_device *obd,
         request->rq_level = LUSTRE_CONN_NEW;
         request->rq_replen = lustre_msg_size(0, NULL);
 
-        imp->imp_export = exp = class_conn2export(conn);
-        exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
+        imp->imp_dlm_handle = *dlm_handle;
 
         imp->imp_level = LUSTRE_CONN_CON;
         rc = ptlrpc_queue_wait(request);
         if (rc) {
-                class_export_put(imp->imp_export);
-                imp->imp_export = exp = NULL;
+                class_disconnect(dlm_handle, 0);
                 GOTO(out_req, rc);
         }
 
+        exp = class_conn2export(dlm_handle);
+        exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
+        class_export_put(exp);
+
         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
         if (rq_opc == MDS_CONNECT || msg_flags & MSG_CONNECT_REPLAYABLE) {
                 imp->imp_replayable = 1;
@@ -103,16 +107,16 @@ out_ldlm:
                 obd->obd_namespace = NULL;
 out_disco:
                 cli->cl_conn_count--;
-                class_disconnect(conn, 0);
+                class_disconnect(dlm_handle, 0);
         }
 out_sem:
         up(&cli->cl_sem);
         return rc;
 }
 
-int ptlrpc_import_disconnect(struct lustre_handle *conn, int failover)
+int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
 {
-        struct obd_device *obd = class_conn2obd(conn);
+        struct obd_device *obd = class_conn2obd(dlm_handle);
         struct client_obd *cli = &obd->u.cli;
         struct obd_import *imp = cli->cl_import;
         struct ptlrpc_request *request = NULL;
@@ -121,7 +125,7 @@ int ptlrpc_import_disconnect(struct lustre_handle *conn, int failover)
 
         if (!obd) {
                 CERROR("invalid connection for disconnect: cookie "LPX64"\n",
-                       conn ? conn->cookie : -1UL);
+                       dlm_handle ? dlm_handle->cookie : -1UL);
                 RETURN(-EINVAL);
         }
 
@@ -146,32 +150,28 @@ int ptlrpc_import_disconnect(struct lustre_handle *conn, int failover)
         }
 
         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
-        if (obd->obd_no_recov && imp->imp_level != LUSTRE_CONN_FULL) {
+        if (obd->obd_no_recov) {
                 ptlrpc_abort_inflight(imp);
         } else {
                 request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
                 if (!request)
                         GOTO(out_req, rc = -ENOMEM);
-                
+
                 request->rq_replen = lustre_msg_size(0, NULL);
-                
+
                 /* Process disconnects even if we're waiting for recovery. */
                 request->rq_level = LUSTRE_CONN_RECOVD;
-                
+
                 rc = ptlrpc_queue_wait(request);
                 if (rc)
                         GOTO(out_req, rc);
         }
-        if (imp->imp_export) {
-                class_export_put(imp->imp_export);
-                imp->imp_export = NULL;
-        }
         EXIT;
  out_req:
         if (request)
                 ptlrpc_req_finished(request);
  out_no_disconnect:
-        err = class_disconnect(conn, 0);
+        err = class_disconnect(dlm_handle, 0);
         if (!rc && err)
                 rc = err;
  out_sem:
@@ -179,240 +179,6 @@ int ptlrpc_import_disconnect(struct lustre_handle *conn, int failover)
         RETURN(rc);
 }
 
-/* Debugging check only needed during development */
-#ifdef OBD_CTXT_DEBUG
-# define ASSERT_CTXT_MAGIC(magic) LASSERT((magic) == OBD_RUN_CTXT_MAGIC)
-# define ASSERT_NOT_KERNEL_CTXT(msg) LASSERT(!segment_eq(get_fs(), get_ds()))
-# define ASSERT_KERNEL_CTXT(msg) LASSERT(segment_eq(get_fs(), get_ds()))
-#else
-# define ASSERT_CTXT_MAGIC(magic) do {} while(0)
-# define ASSERT_NOT_KERNEL_CTXT(msg) do {} while(0)
-# define ASSERT_KERNEL_CTXT(msg) do {} while(0)
-#endif
-
-/* push / pop to root of obd store */
-void push_ctxt(struct obd_run_ctxt *save, struct obd_run_ctxt *new_ctx,
-               struct obd_ucred *uc)
-{
-        //ASSERT_NOT_KERNEL_CTXT("already in kernel context!\n");
-        ASSERT_CTXT_MAGIC(new_ctx->magic);
-        OBD_SET_CTXT_MAGIC(save);
-
-        /*
-        CDEBUG(D_INFO,
-               "= push %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n",
-               save, current, current->fs, current->fs->pwd,
-               atomic_read(&current->fs->pwd->d_count),
-               atomic_read(&current->fs->pwd->d_inode->i_count),
-               current->fs->pwd->d_name.len, current->fs->pwd->d_name.name,
-               current->fs->pwdmnt,
-               atomic_read(&current->fs->pwdmnt->mnt_count));
-        */
-
-        save->fs = get_fs();
-        LASSERT(atomic_read(&current->fs->pwd->d_count));
-        LASSERT(atomic_read(&new_ctx->pwd->d_count));
-        save->pwd = dget(current->fs->pwd);
-        save->pwdmnt = mntget(current->fs->pwdmnt);
-
-        LASSERT(save->pwd);
-        LASSERT(save->pwdmnt);
-        LASSERT(new_ctx->pwd);
-        LASSERT(new_ctx->pwdmnt);
-
-        if (uc) {
-                save->fsuid = current->fsuid;
-                save->fsgid = current->fsgid;
-                save->cap = current->cap_effective;
-
-                current->fsuid = uc->ouc_fsuid;
-                current->fsgid = uc->ouc_fsgid;
-                current->cap_effective = uc->ouc_cap;
-                if (uc->ouc_suppgid1 != -1)
-                        current->groups[current->ngroups++] = uc->ouc_suppgid1;
-                if (uc->ouc_suppgid2 != -1)
-                        current->groups[current->ngroups++] = uc->ouc_suppgid2;
-        }
-        set_fs(new_ctx->fs);
-        set_fs_pwd(current->fs, new_ctx->pwdmnt, new_ctx->pwd);
-
-        /*
-        CDEBUG(D_INFO,
-               "= push %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n",
-               new_ctx, current, current->fs, current->fs->pwd,
-               atomic_read(&current->fs->pwd->d_count),
-               atomic_read(&current->fs->pwd->d_inode->i_count),
-               current->fs->pwd->d_name.len, current->fs->pwd->d_name.name,
-               current->fs->pwdmnt,
-               atomic_read(&current->fs->pwdmnt->mnt_count));
-        */
-}
-
-void pop_ctxt(struct obd_run_ctxt *saved, struct obd_run_ctxt *new_ctx,
-              struct obd_ucred *uc)
-{
-        //printk("pc0");
-        ASSERT_CTXT_MAGIC(saved->magic);
-        //printk("pc1");
-        ASSERT_KERNEL_CTXT("popping non-kernel context!\n");
-
-        /*
-        CDEBUG(D_INFO,
-               " = pop  %p==%p = cur %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n",
-               new_ctx, current, current->fs, current->fs->pwd,
-               atomic_read(&current->fs->pwd->d_count),
-               atomic_read(&current->fs->pwd->d_inode->i_count),
-               current->fs->pwd->d_name.len, current->fs->pwd->d_name.name,
-               current->fs->pwdmnt,
-               atomic_read(&current->fs->pwdmnt->mnt_count));
-        */
-
-        LASSERT(current->fs->pwd == new_ctx->pwd);
-        LASSERT(current->fs->pwdmnt == new_ctx->pwdmnt);
-
-        set_fs(saved->fs);
-        set_fs_pwd(current->fs, saved->pwdmnt, saved->pwd);
-
-        dput(saved->pwd);
-        mntput(saved->pwdmnt);
-        if (uc) {
-                current->fsuid = saved->fsuid;
-                current->fsgid = saved->fsgid;
-                current->cap_effective = saved->cap;
-
-                if (uc->ouc_suppgid1 != -1)
-                        current->ngroups--;
-                if (uc->ouc_suppgid2 != -1)
-                        current->ngroups--;
-        }
-
-        /*
-        CDEBUG(D_INFO,
-               "= pop  %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n",
-               saved, current, current->fs, current->fs->pwd,
-               atomic_read(&current->fs->pwd->d_count),
-               atomic_read(&current->fs->pwd->d_inode->i_count),
-               current->fs->pwd->d_name.len, current->fs->pwd->d_name.name,
-               current->fs->pwdmnt,
-               atomic_read(&current->fs->pwdmnt->mnt_count));
-        */
-}
-
-/* utility to make a file */
-struct dentry *simple_mknod(struct dentry *dir, char *name, int mode)
-{
-        struct dentry *dchild;
-        int err = 0;
-        ENTRY;
-
-        ASSERT_KERNEL_CTXT("kernel doing mknod outside kernel context\n");
-        CDEBUG(D_INODE, "creating file %*s\n", (int)strlen(name), name);
-
-        dchild = lookup_one_len(name, dir, strlen(name));
-        if (IS_ERR(dchild))
-                GOTO(out_up, dchild);
-
-        if (dchild->d_inode) {
-                if ((dchild->d_inode->i_mode & S_IFMT) != S_IFREG)
-                        GOTO(out_err, err = -EEXIST);
-
-                GOTO(out_up, dchild);
-        }
-
-        err = vfs_create(dir->d_inode, dchild, (mode & ~S_IFMT) | S_IFREG);
-        if (err)
-                GOTO(out_err, err);
-
-        RETURN(dchild);
-
-out_err:
-        dput(dchild);
-        dchild = ERR_PTR(err);
-out_up:
-        return dchild;
-}
-
-/* utility to make a directory */
-struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode)
-{
-        struct dentry *dchild;
-        int err = 0;
-        ENTRY;
-
-        ASSERT_KERNEL_CTXT("kernel doing mkdir outside kernel context\n");
-        CDEBUG(D_INODE, "creating directory %*s\n", (int)strlen(name), name);
-        dchild = lookup_one_len(name, dir, strlen(name));
-        if (IS_ERR(dchild))
-                GOTO(out_up, dchild);
-
-        if (dchild->d_inode) {
-                if (!S_ISDIR(dchild->d_inode->i_mode))
-                        GOTO(out_err, err = -ENOTDIR);
-
-                GOTO(out_up, dchild);
-        }
-
-        err = vfs_mkdir(dir->d_inode, dchild, mode);
-        if (err)
-                GOTO(out_err, err);
-
-        RETURN(dchild);
-
-out_err:
-        dput(dchild);
-        dchild = ERR_PTR(err);
-out_up:
-        return dchild;
-}
-
-/*
- * Read a file from within kernel context.  Prior to calling this
- * function we should already have done a push_ctxt().
- */
-int lustre_fread(struct file *file, char *str, int len, loff_t *off)
-{
-        ASSERT_KERNEL_CTXT("kernel doing read outside kernel context\n");
-        if (!file || !file->f_op || !file->f_op->read || !off)
-                RETURN(-ENOSYS);
-
-        return file->f_op->read(file, str, len, off);
-}
-
-/*
- * Write a file from within kernel context.  Prior to calling this
- * function we should already have done a push_ctxt().
- */
-int lustre_fwrite(struct file *file, const char *str, int len, loff_t *off)
-{
-        ENTRY;
-        ASSERT_KERNEL_CTXT("kernel doing write outside kernel context\n");
-        if (!file)
-                RETURN(-ENOENT);
-        if (!file->f_op)
-                RETURN(-ENOSYS);
-        if (!off)
-                RETURN(-EINVAL);
-
-        if (!file->f_op->write)
-                RETURN(-EROFS);
-
-        RETURN(file->f_op->write(file, str, len, off));
-}
-
-/*
- * Sync a file from within kernel context.  Prior to calling this
- * function we should already have done a push_ctxt().
- */
-int lustre_fsync(struct file *file)
-{
-        ENTRY;
-        ASSERT_KERNEL_CTXT("kernel doing sync outside kernel context\n");
-        if (!file || !file->f_op || !file->f_op->fsync)
-                RETURN(-ENOSYS);
-
-        RETURN(file->f_op->fsync(file, file->f_dentry, 0));
-}
-
 /* --------------------------------------------------------------------------
  * from old lib/target.c
  * -------------------------------------------------------------------------- */
@@ -494,7 +260,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         }
 
         /* XXX extract a nettype and format accordingly */
-        snprintf(remote_uuid.uuid, sizeof remote_uuid, 
+        snprintf(remote_uuid.uuid, sizeof remote_uuid,
                  "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
 
         spin_lock_bh(&target->obd_processing_task_lock);
@@ -505,7 +271,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 
         tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
         if (tmp == NULL)
-                GOTO(out, -EPROTO);
+                GOTO(out, rc = -EPROTO);
 
         memcpy(&conn, tmp, sizeof conn);
 
@@ -514,14 +280,13 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 GOTO(out, rc);
 
         /* lctl gets a backstage, all-access pass. */
-        if (!strcmp(cluuid.uuid, "OBD_CLASS_UUID"))
+        if (obd_uuid_equals(&cluuid, &lctl_fake_uuid))
                 goto dont_check_exports;
 
         spin_lock(&target->obd_dev_lock);
         list_for_each(p, &target->obd_exports) {
                 export = list_entry(p, struct obd_export, exp_obd_chain);
-                if (!memcmp(&cluuid, &export->exp_client_uuid,
-                            sizeof(export->exp_client_uuid))) {
+                if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
                         spin_unlock(&target->obd_dev_lock);
                         LASSERT(export->exp_obd == target);
 
@@ -602,7 +367,6 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
         dlmimp->imp_remote_handle = conn;
         dlmimp->imp_obd = target;
-        dlmimp->imp_export = class_export_get(export);
         dlmimp->imp_dlm_fake = 1;
         dlmimp->imp_level = LUSTRE_CONN_FULL;
         class_import_put(dlmimp);
@@ -634,7 +398,7 @@ int target_handle_disconnect(struct ptlrpc_request *req)
 }
 
 /*
- * Recovery functions 
+ * Recovery functions
  */
 
 void target_cancel_recovery_timer(struct obd_device *obd)
@@ -653,6 +417,7 @@ static void abort_delayed_replies(struct obd_device *obd)
                 req->rq_type = PTL_RPC_MSG_ERR;
                 ptlrpc_reply(req);
                 list_del(&req->rq_list);
+                OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
                 OBD_FREE(req, sizeof *req);
         }
 }
@@ -661,12 +426,21 @@ static void abort_recovery_queue(struct obd_device *obd)
 {
         struct ptlrpc_request *req;
         struct list_head *tmp, *n;
+        int rc;
+
         list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 DEBUG_REQ(D_ERROR, req, "aborted:");
                 req->rq_status = -ENOTCONN;
                 req->rq_type = PTL_RPC_MSG_ERR;
-                ptlrpc_reply(req);
+                rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+                                     &req->rq_repmsg);
+                if (rc == 0) {
+                        ptlrpc_reply(req);
+                } else {
+                        DEBUG_REQ(D_ERROR, req,
+                                  "packing failed for abort-reply; skipping");
+                }
                 list_del(&req->rq_list);
                 class_export_put(req->rq_export);
                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
@@ -840,7 +614,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
 
         /* If we're processing the queue, we want don't want to queue this
          * message.
-         * 
+         *
          * Also, if this request has a transno less than the one we're waiting
          * for, we should process it now.  It could (and currently always will)
          * be an open request for a descriptor that was opened some time ago.
@@ -997,8 +771,6 @@ static void ptlrpc_abort_reply (struct ptlrpc_request *req)
                 spin_unlock_irqrestore (&req->rq_lock, flags);
                 return;
         case PTL_INV_MD:
-                /* Both SENT and ACK callbacks happened */
-                LASSERT (!req->rq_want_ack);
                 return;
         case PTL_MD_INUSE:
                 /* Still sending or ACK callback in progress: wait until
@@ -1031,7 +803,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         wait_queue_t commit_wait;
         struct obd_device *obd =
                 req->rq_export ? req->rq_export->exp_obd : NULL;
-        struct obd_export *exp = 
+        struct obd_export *exp =
                 (req->rq_export && req->rq_ack_locks[0].mode) ?
                 req->rq_export : NULL;
 
@@ -1057,13 +829,8 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
                         OBD_FREE(req->rq_repmsg, req->rq_replen);
                         req->rq_repmsg = NULL;
                 }
-                spin_lock_irqsave (&req->rq_lock, flags);
-                req->rq_want_ack = 0;
-                spin_unlock_irqrestore (&req->rq_lock, flags);
-                netrc = -ECOMM;
-                /* NB if we want this failure to simulate sending OK but
-                 * timing out on the ACK, we'll have to be smarter about
-                 * calling ptlrpc_abort_reply() below. */
+                init_waitqueue_head(&req->rq_wait_for_rep);
+                netrc = 0;
         }
 
         /* a failed send simulates the callbacks */