From 531ab50c27f10b5ac29e02b5a2cf56a887173377 Mon Sep 17 00:00:00 2001 From: huanghua Date: Fri, 11 Jul 2008 05:22:19 +0000 Subject: [PATCH] Branch b1_8_gate b=11930 i=adilger i=nikita.danilov i=alex --- lustre/mdc/mdc_request.c | 218 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 173 insertions(+), 45 deletions(-) diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 51cc8bc..204cfce 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -54,21 +54,24 @@ static int mdc_cleanup(struct obd_device *obd); extern int mds_queue_req(struct ptlrpc_request *); /* Helper that implements most of mdc_getstatus and signal_completed_replay. */ /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */ -static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid, +static int send_getstatus(struct obd_export *exp, struct ll_fid *rootfid, int level, int msg_flags) { struct ptlrpc_request *req; struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + int rc, size[3] = { sizeof(struct ptlrpc_body), + sizeof(*body), + sizeof (struct lustre_capa)}; ENTRY; - req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS, 2, size, + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, MDS_GETSTATUS, 2, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); + req->rq_export = class_export_get(exp); req->rq_send_state = level; - ptlrpc_req_set_repsize(req, 2, size); + ptlrpc_req_set_repsize(req, 3, size); mdc_pack_req_body(req, REQ_REC_OFF, 0, NULL, 0, 0); lustre_msg_add_flags(req->rq_reqmsg, msg_flags); @@ -100,8 +103,7 @@ static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid, /* This should be mdc_get_info("rootfid") */ int mdc_getstatus(struct obd_export *exp, struct ll_fid *rootfid) { - return send_getstatus(class_exp2cliimp(exp), rootfid, LUSTRE_IMP_FULL, - 0); + return send_getstatus(exp, rootfid, LUSTRE_IMP_FULL, 0); } static @@ -111,12 +113,11 @@ int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, struct obd_device *obddev = class_exp2obd(exp); struct mds_body *body; void *eadata; - int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + int size[6] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int bufcount = 2, rc; ENTRY; - + /* request message already built */ - if (ea_size != 0) { size[bufcount++] = ea_size; CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n", @@ -127,6 +128,10 @@ int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, CDEBUG(D_INODE, "reserved %u bytes for ACL\n", acl_size); } + if (mdc_exp_is_2_0_server(exp)) { + bufcount = 6; + } + ptlrpc_req_set_repsize(req, bufcount, size); mdc_enter_request(&obddev->u.cli); @@ -185,6 +190,7 @@ int mdc_getattr(struct obd_export *exp, struct ll_fid *fid, if (!req) GOTO(out, rc = -ENOMEM); + req->rq_export = class_export_get(exp); mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, ea_size, MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/); @@ -207,20 +213,31 @@ int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid, unsigned int ea_size, struct ptlrpc_request **request) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), namelen}; + int rc, size[4] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [REQ_REC_OFF] = sizeof(struct mds_body), + [REQ_REC_OFF + 1] = namelen}; + int bufcount = 3; + int nameoffset = REQ_REC_OFF + 1; ENTRY; + if (mdc_exp_is_2_0_server(exp)) { + size[REQ_REC_OFF + 1] = 0; + size[REQ_REC_OFF + 2] = namelen; + bufcount ++; + nameoffset ++; + } + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_GETATTR_NAME, 3, size, NULL); + MDS_GETATTR_NAME, bufcount, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); + req->rq_export = class_export_get(exp); mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, ea_size, MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/); LASSERT(strnlen(filename, namelen) == namelen - 1); - memcpy(lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, namelen), + memcpy(lustre_msg_buf(req->rq_reqmsg, nameoffset, namelen), filename, namelen); rc = mdc_getattr_common(exp, ea_size, 0, req); @@ -241,12 +258,24 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid, { struct obd_device *obddev = class_exp2obd(exp); struct ptlrpc_request *req; - int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) }; - // int size[3] = {sizeof(struct mds_body)}, bufcnt = 1; - int rc, xattr_namelen = 0, bufcnt = 2, offset; + int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [REQ_REC_OFF] = sizeof(struct mds_body), + [REQ_REC_OFF + 1] = 0, /* capa */ + [REQ_REC_OFF + 2] = 0, /* name */ + [REQ_REC_OFF + 3] = 0 }; + int rc, xattr_namelen = 0, bufcnt = 2, offset = REQ_REC_OFF + 1; void *tmp; ENTRY; + if (mdc_exp_is_2_0_server(exp)) { + bufcnt++; + offset++; + if (opcode == MDS_SETXATTR) { + size[REQ_REC_OFF] = sizeof (struct mdt_rec_setxattr); + opcode = MDS_REINT; + } + } + if (xattr_name) { xattr_namelen = strlen(xattr_name) + 1; size[bufcnt++] = xattr_namelen; @@ -261,10 +290,26 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid, if (!req) GOTO(out, rc = -ENOMEM); - /* request data */ - mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, output_size, flags); - - offset = REQ_REC_OFF + 1; + req->rq_export = class_export_get(exp); + + if (opcode == MDS_REINT && mdc_exp_is_2_0_server(exp)) { + struct mdt_rec_setxattr *rec; + rec = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, + sizeof(struct mdt_rec_setxattr)); + rec->sx_opcode = REINT_SETXATTR; + rec->sx_fsuid = current->fsuid; + rec->sx_fsgid = current->fsgid; + rec->sx_cap = current->cap_effective; + rec->sx_suppgid1 = -1; + rec->sx_suppgid2 = -1; + rec->sx_fid = *((struct lu_fid*)fid); + rec->sx_valid = valid; + rec->sx_size = output_size; + rec->sx_flags = flags; + } else { + /* request data */ + mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, output_size, flags); + } if (xattr_name) { tmp = lustre_msg_buf(req->rq_reqmsg, offset++, xattr_namelen); @@ -275,28 +320,32 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid, memcpy(tmp, input, input_size); } - /* reply buffers */ - if (opcode == MDS_GETXATTR) { - size[REPLY_REC_OFF] = sizeof(struct mds_body); + size[REPLY_REC_OFF] = sizeof(struct mds_body); + if (mdc_exp_is_2_0_server(exp)) { bufcnt = 2; } else { - bufcnt = 1; + /* reply buffers */ + if (opcode == MDS_GETXATTR) { + bufcnt = 2; + } else { + bufcnt = 1; + } + } /* we do this even output_size is 0, because server is doing that */ size[bufcnt++] = output_size; - ptlrpc_req_set_repsize(req, bufcnt, size); /* make rpc */ - if (opcode == MDS_SETXATTR) + if (opcode == MDS_SETXATTR || opcode == MDS_REINT) mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); else mdc_enter_request(&obddev->u.cli); rc = ptlrpc_queue_wait(req); - if (opcode == MDS_SETXATTR) + if (opcode == MDS_SETXATTR || opcode == MDS_REINT) mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); else mdc_exit_request(&obddev->u.cli); @@ -412,6 +461,7 @@ int mdc_req2lustre_md(struct ptlrpc_request *req, int offset, struct lustre_md *md) { int rc = 0; + int iop = mdc_req_is_2_0_server(req); ENTRY; LASSERT(md); @@ -455,17 +505,19 @@ int mdc_req2lustre_md(struct ptlrpc_request *req, int offset, } rc = 0; - offset++; - } - - if (md->body->valid & OBD_MD_FLDIREA) { + if (!iop) + offset++; + } else if (md->body->valid & OBD_MD_FLDIREA) { if(!S_ISDIR(md->body->mode)) { CERROR("OBD_MD_FLDIREA set, should be a directory, but " "is not\n"); GOTO(err_out, rc = -EPROTO); } - offset++; + if (!iop) + offset++; } + if (iop) + offset++; /* for ACL, it's possible that FLACL is set but aclsize is zero. * only when aclsize != 0 there's an actual segment for ACL in @@ -655,11 +707,11 @@ static void mdc_commit_close(struct ptlrpc_request *req) spin_unlock(&open_req->rq_lock); } -int mdc_close(struct obd_export *exp, struct obdo *oa, +int mdc_close(struct obd_export *exp, struct mdc_op_data *data, struct obdo *oa, struct obd_client_handle *och, struct ptlrpc_request **request) { struct obd_device *obd = class_exp2obd(exp); - int reqsize[2] = { sizeof(struct ptlrpc_body), + int reqsize[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) }; int rc, repsize[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body), @@ -667,12 +719,20 @@ int mdc_close(struct obd_export *exp, struct obdo *oa, obd->u.cli.cl_max_mds_cookiesize }; struct ptlrpc_request *req; struct mdc_open_data *mod; + int bufcount = 2; ENTRY; + if (mdc_exp_is_2_0_server(exp)) { + reqsize[1] = sizeof(struct mdt_epoch); + reqsize[2] = sizeof(struct mdt_rec_create); + reqsize[3] = 0; /* capa */ + bufcount = 4; + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_CLOSE, 2, reqsize, NULL); + MDS_CLOSE, bufcount, reqsize, NULL); if (req == NULL) GOTO(out, rc = -ENOMEM); + req->rq_export = class_export_get(exp); /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a * portal whose threads are not taking any DLM locks and are therefore @@ -693,12 +753,13 @@ int mdc_close(struct obd_export *exp, struct obdo *oa, GOTO(out, rc = -EIO); } mod->mod_close_req = req; + DEBUG_REQ(D_RPCTRACE, mod->mod_close_req, "close req"); DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "matched open"); } else { CDEBUG(D_RPCTRACE, "couldn't find open req; expecting error\n"); } - mdc_close_pack(req, REQ_REC_OFF, oa, oa->o_valid, och); + mdc_close_pack(req, REQ_REC_OFF, data, oa, oa->o_valid, och); ptlrpc_req_set_repsize(req, 4, repsize); req->rq_commit_cb = mdc_commit_close; @@ -742,7 +803,8 @@ int mdc_close(struct obd_export *exp, struct obdo *oa, return rc; } -int mdc_done_writing(struct obd_export *exp, struct obdo *obdo) +int mdc_done_writing(struct obd_export *exp, struct mdc_op_data *data, + struct obdo *obdo) { struct ptlrpc_request *req; struct mds_body *body; @@ -754,8 +816,9 @@ int mdc_done_writing(struct obd_export *exp, struct obdo *obdo) if (req == NULL) RETURN(-ENOMEM); + req->rq_export = class_export_get(exp); body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode); + body->fid1 = data->fid1; body->size = obdo->o_size; body->blocks = obdo->o_blocks; body->flags = obdo->o_flags; @@ -786,6 +849,7 @@ int mdc_readpage(struct obd_export *exp, struct ll_fid *fid, __u64 offset, if (req == NULL) GOTO(out, rc = -ENOMEM); + req->rq_export = class_export_get(exp); req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); @@ -941,6 +1005,7 @@ int mdc_set_info_async(struct obd_export *exp, obd_count keylen, if (req == NULL) RETURN(-ENOMEM); + req->rq_export = class_export_get(exp); ptlrpc_req_set_repsize(req, 1, NULL); if (set) { rc = 0; @@ -1036,21 +1101,25 @@ output: return rc; } -static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type, +static int mdc_pin(struct obd_export *exp, struct ll_fid *fid, struct obd_client_handle *handle, int flag) { struct ptlrpc_request *req; struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 }; + int bufcount = 2; ENTRY; + if (mdc_exp_is_2_0_server(exp)) + bufcount = 3; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_PIN, 2, size, NULL); + MDS_PIN, bufcount, size, NULL); if (req == NULL) RETURN(-ENOMEM); - + + req->rq_export = class_export_get(exp); body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - mdc_pack_fid(&body->fid1, ino, gen, type); + body->fid1 = *fid; body->flags = flag; ptlrpc_req_set_repsize(req, 2, size); @@ -1100,6 +1169,7 @@ static int mdc_unpin(struct obd_export *exp, if (req == NULL) RETURN(-ENOMEM); + req->rq_export = class_export_get(exp); body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); memcpy(&body->handle, &handle->och_fh, sizeof(body->handle)); body->flags = flag; @@ -1122,15 +1192,20 @@ int mdc_sync(struct obd_export *exp, struct ll_fid *fid, struct ptlrpc_request **request) { struct ptlrpc_request *req; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) }; + int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body), 0 }; + int bufcount = 2; int rc; ENTRY; + + if (mdc_exp_is_2_0_server(exp)) + bufcount = 3; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_SYNC, 2, size, NULL); + MDS_SYNC, bufcount, size, NULL); if (!req) RETURN(rc = -ENOMEM); + req->rq_export = class_export_get(exp); mdc_pack_req_body(req, REQ_REC_OFF, 0, fid, 0, 0); ptlrpc_req_set_repsize(req, 2, size); @@ -1373,6 +1448,57 @@ static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf) return(rc); } +static int mdc_fid_init(struct obd_export *exp) +{ + struct client_obd *cli; + char *prefix; + int rc; + ENTRY; + + cli = &exp->exp_obd->u.cli; + + OBD_ALLOC_PTR(cli->cl_seq); + if (cli->cl_seq == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(prefix, MAX_OBD_NAME + 5); + if (prefix == NULL) + GOTO(out_free_seq, rc = -ENOMEM); + + snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", exp->exp_obd->obd_name); + + /* Init client side sequence-manager */ + rc = seq_client_init(cli->cl_seq, exp, + LUSTRE_SEQ_METADATA, + LUSTRE_SEQ_MAX_WIDTH, + prefix); + OBD_FREE(prefix, MAX_OBD_NAME + 5); + if (rc) + GOTO(out_free_seq, rc); + + RETURN(rc); + +out_free_seq: + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + return rc; +} + +static int mdc_fid_fini(struct obd_export *exp) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + ENTRY; + + if (cli->cl_seq != NULL) { + LASSERT(cli->cl_seq->lcs_exp == exp); + seq_client_fini(cli->cl_seq); + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + } + + RETURN(0); +} + struct obd_ops mdc_obd_ops = { .o_owner = THIS_MODULE, .o_setup = mdc_setup, @@ -1382,6 +1508,8 @@ struct obd_ops mdc_obd_ops = { .o_del_conn = client_import_del_conn, .o_connect = client_connect_import, .o_disconnect = client_disconnect_export, + .o_fid_init = mdc_fid_init, + .o_fid_fini = mdc_fid_fini, .o_iocontrol = mdc_iocontrol, .o_set_info_async = mdc_set_info_async, .o_get_info = mdc_get_info, -- 1.8.3.1