From 40b1c9919699c81bc45b916079365197fe948dc6 Mon Sep 17 00:00:00 2001 From: rread Date: Thu, 5 Jun 2003 04:12:30 +0000 Subject: [PATCH] merge from b_devel -> b_ad DEVEL_AD_PARENT_20030526 -> DEVEL_AD_PARENT_20030605 Large, squirrelly merge. It mounts, and acceptance_small.sh finishes the test and then fails in cleanup, just like b_devel. --- lustre/ldlm/ldlm_lib.c | 315 +++++++------------------------------------------ 1 file changed, 41 insertions(+), 274 deletions(-) diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index b537ebe..2616392 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -30,8 +30,10 @@ #include #include #include +#include -int ptlrpc_import_connect(struct lustre_handle *conn, struct obd_device *obd, +int client_import_connect(struct lustre_handle *dlm_handle, + struct obd_device *obd, struct obd_uuid *cluuid) { struct client_obd *cli = &obd->u.cli; @@ -41,16 +43,16 @@ int ptlrpc_import_connect(struct lustre_handle *conn, struct obd_device *obd, /* XXX maybe this is a good time to create a connect struct? */ int rc, size[] = {sizeof(imp->imp_target_uuid), sizeof(obd->obd_uuid), - sizeof(*conn)}; + sizeof(*dlm_handle)}; char *tmp[] = {imp->imp_target_uuid.uuid, obd->obd_uuid.uuid, - (char *)conn}; + (char *)dlm_handle}; int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT; int msg_flags; ENTRY; down(&cli->cl_sem); - rc = class_connect(conn, obd, cluuid); + rc = class_connect(dlm_handle, obd, cluuid); if (rc) GOTO(out_sem, rc); @@ -72,17 +74,19 @@ int ptlrpc_import_connect(struct lustre_handle *conn, struct obd_device *obd, request->rq_level = LUSTRE_CONN_NEW; request->rq_replen = lustre_msg_size(0, NULL); - imp->imp_export = exp = class_conn2export(conn); - exp->exp_connection = ptlrpc_connection_addref(request->rq_connection); + imp->imp_dlm_handle = *dlm_handle; imp->imp_level = LUSTRE_CONN_CON; rc = ptlrpc_queue_wait(request); if (rc) { - class_export_put(imp->imp_export); - imp->imp_export = exp = NULL; + class_disconnect(dlm_handle, 0); GOTO(out_req, rc); } + exp = class_conn2export(dlm_handle); + exp->exp_connection = ptlrpc_connection_addref(request->rq_connection); + class_export_put(exp); + msg_flags = lustre_msg_get_op_flags(request->rq_repmsg); if (rq_opc == MDS_CONNECT || msg_flags & MSG_CONNECT_REPLAYABLE) { imp->imp_replayable = 1; @@ -103,16 +107,16 @@ out_ldlm: obd->obd_namespace = NULL; out_disco: cli->cl_conn_count--; - class_disconnect(conn, 0); + class_disconnect(dlm_handle, 0); } out_sem: up(&cli->cl_sem); return rc; } -int ptlrpc_import_disconnect(struct lustre_handle *conn, int failover) +int client_import_disconnect(struct lustre_handle *dlm_handle, int failover) { - struct obd_device *obd = class_conn2obd(conn); + struct obd_device *obd = class_conn2obd(dlm_handle); struct client_obd *cli = &obd->u.cli; struct obd_import *imp = cli->cl_import; struct ptlrpc_request *request = NULL; @@ -121,7 +125,7 @@ int ptlrpc_import_disconnect(struct lustre_handle *conn, int failover) if (!obd) { CERROR("invalid connection for disconnect: cookie "LPX64"\n", - conn ? conn->cookie : -1UL); + dlm_handle ? dlm_handle->cookie : -1UL); RETURN(-EINVAL); } @@ -146,32 +150,28 @@ int ptlrpc_import_disconnect(struct lustre_handle *conn, int failover) } /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */ - if (obd->obd_no_recov && imp->imp_level != LUSTRE_CONN_FULL) { + if (obd->obd_no_recov) { ptlrpc_abort_inflight(imp); } else { request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL); if (!request) GOTO(out_req, rc = -ENOMEM); - + request->rq_replen = lustre_msg_size(0, NULL); - + /* Process disconnects even if we're waiting for recovery. */ request->rq_level = LUSTRE_CONN_RECOVD; - + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out_req, rc); } - if (imp->imp_export) { - class_export_put(imp->imp_export); - imp->imp_export = NULL; - } EXIT; out_req: if (request) ptlrpc_req_finished(request); out_no_disconnect: - err = class_disconnect(conn, 0); + err = class_disconnect(dlm_handle, 0); if (!rc && err) rc = err; out_sem: @@ -179,240 +179,6 @@ int ptlrpc_import_disconnect(struct lustre_handle *conn, int failover) RETURN(rc); } -/* Debugging check only needed during development */ -#ifdef OBD_CTXT_DEBUG -# define ASSERT_CTXT_MAGIC(magic) LASSERT((magic) == OBD_RUN_CTXT_MAGIC) -# define ASSERT_NOT_KERNEL_CTXT(msg) LASSERT(!segment_eq(get_fs(), get_ds())) -# define ASSERT_KERNEL_CTXT(msg) LASSERT(segment_eq(get_fs(), get_ds())) -#else -# define ASSERT_CTXT_MAGIC(magic) do {} while(0) -# define ASSERT_NOT_KERNEL_CTXT(msg) do {} while(0) -# define ASSERT_KERNEL_CTXT(msg) do {} while(0) -#endif - -/* push / pop to root of obd store */ -void push_ctxt(struct obd_run_ctxt *save, struct obd_run_ctxt *new_ctx, - struct obd_ucred *uc) -{ - //ASSERT_NOT_KERNEL_CTXT("already in kernel context!\n"); - ASSERT_CTXT_MAGIC(new_ctx->magic); - OBD_SET_CTXT_MAGIC(save); - - /* - CDEBUG(D_INFO, - "= push %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n", - save, current, current->fs, current->fs->pwd, - atomic_read(¤t->fs->pwd->d_count), - atomic_read(¤t->fs->pwd->d_inode->i_count), - current->fs->pwd->d_name.len, current->fs->pwd->d_name.name, - current->fs->pwdmnt, - atomic_read(¤t->fs->pwdmnt->mnt_count)); - */ - - save->fs = get_fs(); - LASSERT(atomic_read(¤t->fs->pwd->d_count)); - LASSERT(atomic_read(&new_ctx->pwd->d_count)); - save->pwd = dget(current->fs->pwd); - save->pwdmnt = mntget(current->fs->pwdmnt); - - LASSERT(save->pwd); - LASSERT(save->pwdmnt); - LASSERT(new_ctx->pwd); - LASSERT(new_ctx->pwdmnt); - - if (uc) { - save->fsuid = current->fsuid; - save->fsgid = current->fsgid; - save->cap = current->cap_effective; - - current->fsuid = uc->ouc_fsuid; - current->fsgid = uc->ouc_fsgid; - current->cap_effective = uc->ouc_cap; - if (uc->ouc_suppgid1 != -1) - current->groups[current->ngroups++] = uc->ouc_suppgid1; - if (uc->ouc_suppgid2 != -1) - current->groups[current->ngroups++] = uc->ouc_suppgid2; - } - set_fs(new_ctx->fs); - set_fs_pwd(current->fs, new_ctx->pwdmnt, new_ctx->pwd); - - /* - CDEBUG(D_INFO, - "= push %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n", - new_ctx, current, current->fs, current->fs->pwd, - atomic_read(¤t->fs->pwd->d_count), - atomic_read(¤t->fs->pwd->d_inode->i_count), - current->fs->pwd->d_name.len, current->fs->pwd->d_name.name, - current->fs->pwdmnt, - atomic_read(¤t->fs->pwdmnt->mnt_count)); - */ -} - -void pop_ctxt(struct obd_run_ctxt *saved, struct obd_run_ctxt *new_ctx, - struct obd_ucred *uc) -{ - //printk("pc0"); - ASSERT_CTXT_MAGIC(saved->magic); - //printk("pc1"); - ASSERT_KERNEL_CTXT("popping non-kernel context!\n"); - - /* - CDEBUG(D_INFO, - " = pop %p==%p = cur %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n", - new_ctx, current, current->fs, current->fs->pwd, - atomic_read(¤t->fs->pwd->d_count), - atomic_read(¤t->fs->pwd->d_inode->i_count), - current->fs->pwd->d_name.len, current->fs->pwd->d_name.name, - current->fs->pwdmnt, - atomic_read(¤t->fs->pwdmnt->mnt_count)); - */ - - LASSERT(current->fs->pwd == new_ctx->pwd); - LASSERT(current->fs->pwdmnt == new_ctx->pwdmnt); - - set_fs(saved->fs); - set_fs_pwd(current->fs, saved->pwdmnt, saved->pwd); - - dput(saved->pwd); - mntput(saved->pwdmnt); - if (uc) { - current->fsuid = saved->fsuid; - current->fsgid = saved->fsgid; - current->cap_effective = saved->cap; - - if (uc->ouc_suppgid1 != -1) - current->ngroups--; - if (uc->ouc_suppgid2 != -1) - current->ngroups--; - } - - /* - CDEBUG(D_INFO, - "= pop %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n", - saved, current, current->fs, current->fs->pwd, - atomic_read(¤t->fs->pwd->d_count), - atomic_read(¤t->fs->pwd->d_inode->i_count), - current->fs->pwd->d_name.len, current->fs->pwd->d_name.name, - current->fs->pwdmnt, - atomic_read(¤t->fs->pwdmnt->mnt_count)); - */ -} - -/* utility to make a file */ -struct dentry *simple_mknod(struct dentry *dir, char *name, int mode) -{ - struct dentry *dchild; - int err = 0; - ENTRY; - - ASSERT_KERNEL_CTXT("kernel doing mknod outside kernel context\n"); - CDEBUG(D_INODE, "creating file %*s\n", (int)strlen(name), name); - - dchild = lookup_one_len(name, dir, strlen(name)); - if (IS_ERR(dchild)) - GOTO(out_up, dchild); - - if (dchild->d_inode) { - if ((dchild->d_inode->i_mode & S_IFMT) != S_IFREG) - GOTO(out_err, err = -EEXIST); - - GOTO(out_up, dchild); - } - - err = vfs_create(dir->d_inode, dchild, (mode & ~S_IFMT) | S_IFREG); - if (err) - GOTO(out_err, err); - - RETURN(dchild); - -out_err: - dput(dchild); - dchild = ERR_PTR(err); -out_up: - return dchild; -} - -/* utility to make a directory */ -struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode) -{ - struct dentry *dchild; - int err = 0; - ENTRY; - - ASSERT_KERNEL_CTXT("kernel doing mkdir outside kernel context\n"); - CDEBUG(D_INODE, "creating directory %*s\n", (int)strlen(name), name); - dchild = lookup_one_len(name, dir, strlen(name)); - if (IS_ERR(dchild)) - GOTO(out_up, dchild); - - if (dchild->d_inode) { - if (!S_ISDIR(dchild->d_inode->i_mode)) - GOTO(out_err, err = -ENOTDIR); - - GOTO(out_up, dchild); - } - - err = vfs_mkdir(dir->d_inode, dchild, mode); - if (err) - GOTO(out_err, err); - - RETURN(dchild); - -out_err: - dput(dchild); - dchild = ERR_PTR(err); -out_up: - return dchild; -} - -/* - * Read a file from within kernel context. Prior to calling this - * function we should already have done a push_ctxt(). - */ -int lustre_fread(struct file *file, char *str, int len, loff_t *off) -{ - ASSERT_KERNEL_CTXT("kernel doing read outside kernel context\n"); - if (!file || !file->f_op || !file->f_op->read || !off) - RETURN(-ENOSYS); - - return file->f_op->read(file, str, len, off); -} - -/* - * Write a file from within kernel context. Prior to calling this - * function we should already have done a push_ctxt(). - */ -int lustre_fwrite(struct file *file, const char *str, int len, loff_t *off) -{ - ENTRY; - ASSERT_KERNEL_CTXT("kernel doing write outside kernel context\n"); - if (!file) - RETURN(-ENOENT); - if (!file->f_op) - RETURN(-ENOSYS); - if (!off) - RETURN(-EINVAL); - - if (!file->f_op->write) - RETURN(-EROFS); - - RETURN(file->f_op->write(file, str, len, off)); -} - -/* - * Sync a file from within kernel context. Prior to calling this - * function we should already have done a push_ctxt(). - */ -int lustre_fsync(struct file *file) -{ - ENTRY; - ASSERT_KERNEL_CTXT("kernel doing sync outside kernel context\n"); - if (!file || !file->f_op || !file->f_op->fsync) - RETURN(-ENOSYS); - - RETURN(file->f_op->fsync(file, file->f_dentry, 0)); -} - /* -------------------------------------------------------------------------- * from old lib/target.c * -------------------------------------------------------------------------- */ @@ -494,7 +260,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } /* XXX extract a nettype and format accordingly */ - snprintf(remote_uuid.uuid, sizeof remote_uuid, + snprintf(remote_uuid.uuid, sizeof remote_uuid, "NET_"LPX64"_UUID", req->rq_peer.peer_nid); spin_lock_bh(&target->obd_processing_task_lock); @@ -505,7 +271,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn); if (tmp == NULL) - GOTO(out, -EPROTO); + GOTO(out, rc = -EPROTO); memcpy(&conn, tmp, sizeof conn); @@ -514,14 +280,13 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) GOTO(out, rc); /* lctl gets a backstage, all-access pass. */ - if (!strcmp(cluuid.uuid, "OBD_CLASS_UUID")) + if (obd_uuid_equals(&cluuid, &lctl_fake_uuid)) goto dont_check_exports; spin_lock(&target->obd_dev_lock); list_for_each(p, &target->obd_exports) { export = list_entry(p, struct obd_export, exp_obd_chain); - if (!memcmp(&cluuid, &export->exp_client_uuid, - sizeof(export->exp_client_uuid))) { + if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) { spin_unlock(&target->obd_dev_lock); LASSERT(export->exp_obd == target); @@ -602,7 +367,6 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) dlmimp->imp_client = &export->exp_obd->obd_ldlm_client; dlmimp->imp_remote_handle = conn; dlmimp->imp_obd = target; - dlmimp->imp_export = class_export_get(export); dlmimp->imp_dlm_fake = 1; dlmimp->imp_level = LUSTRE_CONN_FULL; class_import_put(dlmimp); @@ -634,7 +398,7 @@ int target_handle_disconnect(struct ptlrpc_request *req) } /* - * Recovery functions + * Recovery functions */ void target_cancel_recovery_timer(struct obd_device *obd) @@ -653,6 +417,7 @@ static void abort_delayed_replies(struct obd_device *obd) req->rq_type = PTL_RPC_MSG_ERR; ptlrpc_reply(req); list_del(&req->rq_list); + OBD_FREE(req->rq_reqmsg, req->rq_reqlen); OBD_FREE(req, sizeof *req); } } @@ -661,12 +426,21 @@ static void abort_recovery_queue(struct obd_device *obd) { struct ptlrpc_request *req; struct list_head *tmp, *n; + int rc; + list_for_each_safe(tmp, n, &obd->obd_recovery_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); DEBUG_REQ(D_ERROR, req, "aborted:"); req->rq_status = -ENOTCONN; req->rq_type = PTL_RPC_MSG_ERR; - ptlrpc_reply(req); + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, + &req->rq_repmsg); + if (rc == 0) { + ptlrpc_reply(req); + } else { + DEBUG_REQ(D_ERROR, req, + "packing failed for abort-reply; skipping"); + } list_del(&req->rq_list); class_export_put(req->rq_export); OBD_FREE(req->rq_reqmsg, req->rq_reqlen); @@ -840,7 +614,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, /* If we're processing the queue, we want don't want to queue this * message. - * + * * Also, if this request has a transno less than the one we're waiting * for, we should process it now. It could (and currently always will) * be an open request for a descriptor that was opened some time ago. @@ -997,8 +771,6 @@ static void ptlrpc_abort_reply (struct ptlrpc_request *req) spin_unlock_irqrestore (&req->rq_lock, flags); return; case PTL_INV_MD: - /* Both SENT and ACK callbacks happened */ - LASSERT (!req->rq_want_ack); return; case PTL_MD_INUSE: /* Still sending or ACK callback in progress: wait until @@ -1031,7 +803,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) wait_queue_t commit_wait; struct obd_device *obd = req->rq_export ? req->rq_export->exp_obd : NULL; - struct obd_export *exp = + struct obd_export *exp = (req->rq_export && req->rq_ack_locks[0].mode) ? req->rq_export : NULL; @@ -1057,13 +829,8 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) OBD_FREE(req->rq_repmsg, req->rq_replen); req->rq_repmsg = NULL; } - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 0; - spin_unlock_irqrestore (&req->rq_lock, flags); - netrc = -ECOMM; - /* NB if we want this failure to simulate sending OK but - * timing out on the ACK, we'll have to be smarter about - * calling ptlrpc_abort_reply() below. */ + init_waitqueue_head(&req->rq_wait_for_rep); + netrc = 0; } /* a failed send simulates the callbacks */ -- 1.8.3.1