From: wang di Date: Tue, 7 May 2013 07:00:46 +0000 (-0700) Subject: LU-3187 ost: check pre 2.4 echo client in obdo validation X-Git-Tag: 2.4.0-RC1~12 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=00d9dff4fa51321b2185fcdc381ee8edb6ca62ce LU-3187 ost: check pre 2.4 echo client in obdo validation Because old echo client still uses o_id/o_seq for objid, but new echo client will uses FID for the objid. Add OBD_CONNECT_FID for 2.4 echo client, so 2.4 OST will convert o_id/o_seq to FID if the request from old echo client. Add local flag OBD_FL_OSTID for o_flags to indicate OST does not support FID yet, then echo client will still send o_id/o_seq to OST. cleanup ost_validate_obdo Test-Parameters: clientjob=lustre-b2_1 clientbuildno=197 testlist=sanity,obdfilter-survey Test-Parameters: serverjob=lustre-b2_1 serverbuildno=197 testlist=sanity,obdfilter-survey Signed-off-by: wang di Change-Id: I6001c813b668cf53a66d0d9d74f322bad63765ed Reviewed-on: http://review.whamcloud.com/6287 Tested-by: Hudson Reviewed-by: Andreas Dilger Tested-by: Maloo Reviewed-by: Mike Pershin --- diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index efcca73..1b1928d 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -3222,13 +3222,25 @@ struct obdo { #define o_cksum o_nlink #define o_grant_used o_data_version -static inline void lustre_set_wire_obdo(struct obdo *wobdo, struct obdo *lobdo) +static inline void lustre_set_wire_obdo(struct obd_connect_data *ocd, + struct obdo *wobdo, struct obdo *lobdo) { memcpy(wobdo, lobdo, sizeof(*lobdo)); - wobdo->o_flags &= ~OBD_FL_LOCAL_MASK; + wobdo->o_flags &= ~OBD_FL_LOCAL_MASK; + if (ocd == NULL) + return; + + if (unlikely(!(ocd->ocd_connect_flags & OBD_CONNECT_FID)) && + fid_seq_is_echo(fid_seq(&lobdo->o_oi.oi_fid))) { + /* Currently OBD_FL_OSTID will only be used when 2.4 echo + * client communicate with pre-2.4 server */ + wobdo->o_oi.oi.oi_id = fid_oid(&lobdo->o_oi.oi_fid); + wobdo->o_oi.oi.oi_seq = fid_seq(&lobdo->o_oi.oi_fid); + } } -static inline void lustre_get_wire_obdo(struct obdo *lobdo, struct obdo *wobdo) +static inline void lustre_get_wire_obdo(struct obd_connect_data *ocd, + struct obdo *lobdo, struct obdo *wobdo) { obd_flag local_flags = 0; @@ -3237,12 +3249,22 @@ static inline void lustre_get_wire_obdo(struct obdo *lobdo, struct obdo *wobdo) LASSERT(!(wobdo->o_flags & OBD_FL_LOCAL_MASK)); - memcpy(lobdo, wobdo, sizeof(*lobdo)); - if (local_flags != 0) { - lobdo->o_valid |= OBD_MD_FLFLAGS; - lobdo->o_flags &= ~OBD_FL_LOCAL_MASK; - lobdo->o_flags |= local_flags; - } + memcpy(lobdo, wobdo, sizeof(*lobdo)); + if (local_flags != 0) { + lobdo->o_valid |= OBD_MD_FLFLAGS; + lobdo->o_flags &= ~OBD_FL_LOCAL_MASK; + lobdo->o_flags |= local_flags; + } + if (ocd == NULL) + return; + + if (unlikely(!(ocd->ocd_connect_flags & OBD_CONNECT_FID)) && + fid_seq_is_echo(wobdo->o_oi.oi.oi_seq)) { + /* see above */ + lobdo->o_oi.oi_fid.f_seq = wobdo->o_oi.oi.oi_seq; + lobdo->o_oi.oi_fid.f_oid = wobdo->o_oi.oi.oi_id; + lobdo->o_oi.oi_fid.f_ver = 0; + } } extern void lustre_swab_obdo (struct obdo *o); diff --git a/lustre/mdt/out_handler.c b/lustre/mdt/out_handler.c index 0f4424d..f0b0daa 100644 --- a/lustre/mdt/out_handler.c +++ b/lustre/mdt/out_handler.c @@ -320,7 +320,7 @@ static int out_create(struct out_thread_info *info) } obdo_le_to_cpu(wobdo, wobdo); - lustre_get_wire_obdo(lobdo, wobdo); + lustre_get_wire_obdo(NULL, lobdo, wobdo); la_from_obdo(attr, lobdo, lobdo->o_valid); dof->dof_type = dt_mode_to_dft(attr->la_mode); @@ -432,7 +432,7 @@ static int out_attr_set(struct out_thread_info *info) attr->la_valid = 0; attr->la_valid = 0; obdo_le_to_cpu(wobdo, wobdo); - lustre_get_wire_obdo(lobdo, wobdo); + lustre_get_wire_obdo(NULL, lobdo, wobdo); la_from_obdo(attr, lobdo, lobdo->o_valid); rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle, @@ -499,7 +499,7 @@ static int out_attr_get(struct out_thread_info *info) obdo->o_valid = 0; obdo_from_la(obdo, la, la->la_valid); obdo_cpu_to_le(obdo, obdo); - lustre_set_wire_obdo(obdo, obdo); + lustre_set_wire_obdo(NULL, obdo, obdo); out_unlock: dt_read_unlock(env, obj); diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index bbb5f52..126a985 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -3040,7 +3040,8 @@ static int echo_client_setup(const struct lu_env *env, ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 | - OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE; + OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE | + OBD_CONNECT_FID; ocd->ocd_brw_size = DT_MAX_BRW_SIZE; ocd->ocd_version = LUSTRE_VERSION_CODE; ocd->ocd_group = FID_SEQ_ECHO; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index f73f218..ee03ef5 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -179,13 +179,14 @@ static inline void osc_pack_capa(struct ptlrpc_request *req, static inline void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo) { - struct ost_body *body; + struct ost_body *body; - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); - lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, + oinfo->oi_oa); + osc_pack_capa(req, body, oinfo->oi_capa); } static inline void osc_set_capa_size(struct ptlrpc_request *req, @@ -211,8 +212,9 @@ static int osc_getattr_interpret(const struct lu_env *env, body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body) { - CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa); + CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, + aa->aa_oi->oi_oa, &body->oa); /* This should really be sent by the OST */ aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; @@ -290,8 +292,9 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp, if (body == NULL) GOTO(out, rc = -EPROTO); - CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(oinfo->oi_oa, &body->oa); + CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, + &body->oa); oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; @@ -335,7 +338,8 @@ static int osc_setattr(const struct lu_env *env, struct obd_export *exp, if (body == NULL) GOTO(out, rc = -EPROTO); - lustre_get_wire_obdo(oinfo->oi_oa, &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, + &body->oa); EXIT; out: @@ -357,7 +361,8 @@ static int osc_setattr_interpret(const struct lu_env *env, if (body == NULL) GOTO(out, rc = -EPROTO); - lustre_get_wire_obdo(sa->sa_oa, &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa, + &body->oa); out: rc = sa->sa_upcall(sa->sa_cookie, rc); RETURN(rc); @@ -453,7 +458,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&body->oa, oa); + + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); ptlrpc_request_set_replen(req); @@ -473,7 +479,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, if (body == NULL) GOTO(out_req, rc = -EPROTO); - lustre_get_wire_obdo(oa, &body->oa); + CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; @@ -528,10 +535,11 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, + oinfo->oi_oa); + osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); @@ -604,11 +612,12 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); } - /* overload the size and blocks fields in the oa with start/end */ - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + /* overload the size and blocks fields in the oa with start/end */ + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, + oinfo->oi_oa); + osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); req->rq_interpret_reply = osc_sync_interpret; @@ -781,11 +790,11 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); - if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) - oa->o_lcookie = *oti->oti_logcookies; - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - lustre_set_wire_obdo(&body->oa, oa); + if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) + oa->o_lcookie = *oti->oti_logcookies; + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); osc_pack_capa(req, body, (struct obd_capa *)capa); ptlrpc_request_set_replen(req); @@ -1305,7 +1314,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); LASSERT(body != NULL && ioobj != NULL && niobuf != NULL); - lustre_set_wire_obdo(&body->oa, oa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); obdo_to_ioobj(oa, ioobj); ioobj->ioo_bufcnt = niocount; @@ -1636,8 +1645,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) rc = 0; } out: - if (rc >= 0) - lustre_get_wire_obdo(aa->aa_oa, &body->oa); + if (rc >= 0) + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, + aa->aa_oa, &body->oa); RETURN(rc); } diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 4532233..390000c 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -322,7 +322,7 @@ static int osp_get_attr_from_req(const struct lu_env *env, return -EPROTO; obdo_le_to_cpu(wobdo, wobdo); - lustre_get_wire_obdo(lobdo, wobdo); + lustre_get_wire_obdo(NULL, lobdo, wobdo); la_from_obdo(attr, lobdo, lobdo->o_valid); return 0; @@ -355,7 +355,7 @@ static int osp_md_declare_object_create(const struct lu_env *env, osi->osi_obdo.o_valid = 0; LASSERT(S_ISDIR(attr->la_mode)); obdo_from_la(&osi->osi_obdo, attr, attr->la_valid); - lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo); + lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo); bufs[0] = (char *)&osi->osi_obdo; @@ -545,7 +545,7 @@ static int osp_md_declare_attr_set(const struct lu_env *env, LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE))); obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr, attr->la_valid); - lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo); + lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo); buf = (char *)&osi->osi_obdo; diff --git a/lustre/osp/osp_precreate.c b/lustre/osp/osp_precreate.c index 073bd4b..18e11cb 100644 --- a/lustre/osp/osp_precreate.c +++ b/lustre/osp/osp_precreate.c @@ -1219,7 +1219,7 @@ int osp_object_truncate(const struct lu_env *env, struct dt_object *dt, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&body->oa, oa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); /* XXX: capa support? */ /* osc_pack_capa(req, body, capa); */ diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 3f7e207..81003e7 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -103,25 +103,50 @@ static void ost_drop_id(struct obd_export *exp, struct obdo *oa) * Validate oa from client. * If the request comes from 2.0 clients, currently only RSVD seq and IDIF * req are valid. - * a. for single MDS seq = FID_SEQ_OST_MDT0, - * b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX + * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0 + * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to + * pack ost_id. Because non-zero oi_seq will make it diffcult to tell + * whether this is oi_fid or real ostid. So it will check + * OBD_CONNECT_FID, then convert the ostid to FID for old client. + * c. Old FID-disable osc will send IDIF. + * d. new FID-enable osc/osp will send normal FID. + * + * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will + * be used for LAST_ID file, and only being accessed inside OST now. */ static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa, struct obd_ioobj *ioobj) { - if (unlikely(oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP))) { - ostid_set_seq_mdt0(&oa->o_oi); - if (ioobj) - ostid_set_seq_mdt0(&ioobj->ioo_oid); - } else if (unlikely(oa == NULL || - !(fid_seq_is_idif(ostid_seq(&oa->o_oi)) || - fid_seq_is_mdt(ostid_seq(&oa->o_oi)) || - fid_seq_is_echo(ostid_seq(&oa->o_oi))))) { - CERROR("%s: client %s sent bad object "DOSTID": rc = -EPROTO\n", - exp->exp_obd->obd_name, obd_export_nid2str(exp), - oa ? ostid_seq(&oa->o_oi) : -1, - oa ? ostid_id(&oa->o_oi) : -1); - return -EPROTO; + int rc = 0; + + if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) && + fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) { + /* Sigh 2.[123] client still sends echo req with oi_id = 0 + * during create, and we will reset this to 1, since this + * oi_id is basically useless in the following create process, + * but oi_id == 0 will make it difficult to tell whether it is + * real FID or ost_id. */ + oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1; + oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO; + oa->o_oi.oi_fid.f_ver = 0; + } else { + if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0)) + GOTO(out, rc = -EPROTO); + + /* Note: this check might be forced in 2.5 or 2.6, i.e. + * all of the requests are required to setup FLGROUP */ + if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) { + ostid_set_seq_mdt0(&oa->o_oi); + if (ioobj) + ostid_set_seq_mdt0(&ioobj->ioo_oid); + oa->o_valid |= OBD_MD_FLGROUP; + } + + if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) || + fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) || + fid_seq_is_norm(ostid_seq(&oa->o_oi)) || + fid_seq_is_echo(ostid_seq(&oa->o_oi))))) + GOTO(out, rc = -EPROTO); } if (ioobj != NULL) { @@ -132,11 +157,18 @@ static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa, ": rc = -EPROTO\n", exp->exp_obd->obd_name, obd_export_nid2str(exp), max_brw, POSTID(&oa->o_oi)); - return -EPROTO; + GOTO(out, rc = -EPROTO); } ioobj->ioo_oid = oa->o_oi; } - return 0; + +out: + if (rc != 0) + CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n", + exp->exp_obd->obd_name, obd_export_nid2str(exp), + oa ? ostid_seq(&oa->o_oi) : -1, + oa ? ostid_id(&oa->o_oi) : -1, rc); + return rc; } void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)