Whamcloud - gitweb
LU-3187 ost: check pre 2.4 echo client in obdo validation
authorwang di <di.wang@intel.com>
Tue, 7 May 2013 07:00:46 +0000 (00:00 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 13 May 2013 16:30:06 +0000 (12:30 -0400)
Because old echo client still uses o_id/o_seq for objid,
but new echo client will uses FID for the objid. Add
OBD_CONNECT_FID for 2.4 echo client, so 2.4 OST will
convert o_id/o_seq to FID if the request from old echo
client.

Add local flag OBD_FL_OSTID for o_flags to indicate
OST does not support FID yet, then echo client will
still send o_id/o_seq to OST.

cleanup ost_validate_obdo

Test-Parameters: clientjob=lustre-b2_1 clientbuildno=197 testlist=sanity,obdfilter-survey
Test-Parameters: serverjob=lustre-b2_1 serverbuildno=197 testlist=sanity,obdfilter-survey
Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I6001c813b668cf53a66d0d9d74f322bad63765ed
Reviewed-on: http://review.whamcloud.com/6287
Tested-by: Hudson
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/mdt/out_handler.c
lustre/obdecho/echo_client.c
lustre/osc/osc_request.c
lustre/osp/osp_md_object.c
lustre/osp/osp_precreate.c
lustre/ost/ost_handler.c

index efcca73..1b1928d 100644 (file)
@@ -3222,13 +3222,25 @@ struct obdo {
 #define o_cksum   o_nlink
 #define o_grant_used o_data_version
 
-static inline void lustre_set_wire_obdo(struct obdo *wobdo, struct obdo *lobdo)
+static inline void lustre_set_wire_obdo(struct obd_connect_data *ocd,
+                                       struct obdo *wobdo, struct obdo *lobdo)
 {
         memcpy(wobdo, lobdo, sizeof(*lobdo));
-        wobdo->o_flags &= ~OBD_FL_LOCAL_MASK;
+       wobdo->o_flags &= ~OBD_FL_LOCAL_MASK;
+       if (ocd == NULL)
+               return;
+
+       if (unlikely(!(ocd->ocd_connect_flags & OBD_CONNECT_FID)) &&
+           fid_seq_is_echo(fid_seq(&lobdo->o_oi.oi_fid))) {
+               /* Currently OBD_FL_OSTID will only be used when 2.4 echo
+                * client communicate with pre-2.4 server */
+               wobdo->o_oi.oi.oi_id = fid_oid(&lobdo->o_oi.oi_fid);
+               wobdo->o_oi.oi.oi_seq = fid_seq(&lobdo->o_oi.oi_fid);
+       }
 }
 
-static inline void lustre_get_wire_obdo(struct obdo *lobdo, struct obdo *wobdo)
+static inline void lustre_get_wire_obdo(struct obd_connect_data *ocd,
+                                       struct obdo *lobdo, struct obdo *wobdo)
 {
         obd_flag local_flags = 0;
 
@@ -3237,12 +3249,22 @@ static inline void lustre_get_wire_obdo(struct obdo *lobdo, struct obdo *wobdo)
 
         LASSERT(!(wobdo->o_flags & OBD_FL_LOCAL_MASK));
 
-        memcpy(lobdo, wobdo, sizeof(*lobdo));
-        if (local_flags != 0) {
-                 lobdo->o_valid |= OBD_MD_FLFLAGS;
-                 lobdo->o_flags &= ~OBD_FL_LOCAL_MASK;
-                 lobdo->o_flags |= local_flags;
-        }
+       memcpy(lobdo, wobdo, sizeof(*lobdo));
+       if (local_flags != 0) {
+               lobdo->o_valid |= OBD_MD_FLFLAGS;
+               lobdo->o_flags &= ~OBD_FL_LOCAL_MASK;
+               lobdo->o_flags |= local_flags;
+       }
+       if (ocd == NULL)
+               return;
+
+       if (unlikely(!(ocd->ocd_connect_flags & OBD_CONNECT_FID)) &&
+           fid_seq_is_echo(wobdo->o_oi.oi.oi_seq)) {
+               /* see above */
+               lobdo->o_oi.oi_fid.f_seq = wobdo->o_oi.oi.oi_seq;
+               lobdo->o_oi.oi_fid.f_oid = wobdo->o_oi.oi.oi_id;
+               lobdo->o_oi.oi_fid.f_ver = 0;
+       }
 }
 
 extern void lustre_swab_obdo (struct obdo *o);
index 0f4424d..f0b0daa 100644 (file)
@@ -320,7 +320,7 @@ static int out_create(struct out_thread_info *info)
        }
 
        obdo_le_to_cpu(wobdo, wobdo);
-       lustre_get_wire_obdo(lobdo, wobdo);
+       lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
        dof->dof_type = dt_mode_to_dft(attr->la_mode);
@@ -432,7 +432,7 @@ static int out_attr_set(struct out_thread_info *info)
        attr->la_valid = 0;
        attr->la_valid = 0;
        obdo_le_to_cpu(wobdo, wobdo);
-       lustre_get_wire_obdo(lobdo, wobdo);
+       lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
        rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
@@ -499,7 +499,7 @@ static int out_attr_get(struct out_thread_info *info)
        obdo->o_valid = 0;
        obdo_from_la(obdo, la, la->la_valid);
        obdo_cpu_to_le(obdo, obdo);
-       lustre_set_wire_obdo(obdo, obdo);
+       lustre_set_wire_obdo(NULL, obdo, obdo);
 
 out_unlock:
        dt_read_unlock(env, obj);
index bbb5f52..126a985 100644 (file)
@@ -3040,7 +3040,8 @@ static int echo_client_setup(const struct lu_env *env,
         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
                                 OBD_CONNECT_BRW_SIZE |
                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
-                                OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE;
+                                OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
+                                OBD_CONNECT_FID;
        ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
         ocd->ocd_version = LUSTRE_VERSION_CODE;
         ocd->ocd_group = FID_SEQ_ECHO;
index f73f218..ee03ef5 100644 (file)
@@ -179,13 +179,14 @@ static inline void osc_pack_capa(struct ptlrpc_request *req,
 static inline void osc_pack_req_body(struct ptlrpc_request *req,
                                      struct obd_info *oinfo)
 {
-        struct ost_body *body;
+       struct ost_body *body;
 
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body);
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
 
-        lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
-        osc_pack_capa(req, body, oinfo->oi_capa);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
+                            oinfo->oi_oa);
+       osc_pack_capa(req, body, oinfo->oi_capa);
 }
 
 static inline void osc_set_capa_size(struct ptlrpc_request *req,
@@ -211,8 +212,9 @@ static int osc_getattr_interpret(const struct lu_env *env,
 
         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         if (body) {
-                CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-                lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
+               CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
+               lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
+                                    aa->aa_oi->oi_oa, &body->oa);
 
                /* This should really be sent by the OST */
                aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
@@ -290,8 +292,9 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-        lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
+       CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
+                            &body->oa);
 
        oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
        oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
@@ -335,7 +338,8 @@ static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
+                            &body->oa);
 
         EXIT;
 out:
@@ -357,7 +361,8 @@ static int osc_setattr_interpret(const struct lu_env *env,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        lustre_get_wire_obdo(sa->sa_oa, &body->oa);
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
+                            &body->oa);
 out:
         rc = sa->sa_upcall(sa->sa_cookie, rc);
         RETURN(rc);
@@ -453,7 +458,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        lustre_set_wire_obdo(&body->oa, oa);
+
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 
         ptlrpc_request_set_replen(req);
 
@@ -473,7 +479,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
         if (body == NULL)
                 GOTO(out_req, rc = -EPROTO);
 
-        lustre_get_wire_obdo(oa, &body->oa);
+       CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
 
        oa->o_blksize = cli_brw_size(exp->exp_obd);
        oa->o_valid |= OBD_MD_FLBLKSZ;
@@ -528,10 +535,11 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
         ptlrpc_at_set_req_timeout(req);
 
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body);
-        lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
-        osc_pack_capa(req, body, oinfo->oi_capa);
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
+                            oinfo->oi_oa);
+       osc_pack_capa(req, body, oinfo->oi_capa);
 
         ptlrpc_request_set_replen(req);
 
@@ -604,11 +612,12 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
         }
 
-        /* overload the size and blocks fields in the oa with start/end */
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body);
-        lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
-        osc_pack_capa(req, body, oinfo->oi_capa);
+       /* overload the size and blocks fields in the oa with start/end */
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
+                            oinfo->oi_oa);
+       osc_pack_capa(req, body, oinfo->oi_capa);
 
         ptlrpc_request_set_replen(req);
         req->rq_interpret_reply = osc_sync_interpret;
@@ -781,11 +790,11 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
         ptlrpc_at_set_req_timeout(req);
 
-        if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
-                oa->o_lcookie = *oti->oti_logcookies;
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body);
-        lustre_set_wire_obdo(&body->oa, oa);
+       if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
+               oa->o_lcookie = *oti->oti_logcookies;
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 
         osc_pack_capa(req, body, (struct obd_capa *)capa);
         ptlrpc_request_set_replen(req);
@@ -1305,7 +1314,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
 
-        lustre_set_wire_obdo(&body->oa, oa);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 
        obdo_to_ioobj(oa, ioobj);
        ioobj->ioo_bufcnt = niocount;
@@ -1636,8 +1645,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 rc = 0;
         }
 out:
-        if (rc >= 0)
-                lustre_get_wire_obdo(aa->aa_oa, &body->oa);
+       if (rc >= 0)
+               lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
+                                    aa->aa_oa, &body->oa);
 
         RETURN(rc);
 }
index 4532233..390000c 100644 (file)
@@ -322,7 +322,7 @@ static int osp_get_attr_from_req(const struct lu_env *env,
                return -EPROTO;
 
        obdo_le_to_cpu(wobdo, wobdo);
-       lustre_get_wire_obdo(lobdo, wobdo);
+       lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
        return 0;
@@ -355,7 +355,7 @@ static int osp_md_declare_object_create(const struct lu_env *env,
        osi->osi_obdo.o_valid = 0;
        LASSERT(S_ISDIR(attr->la_mode));
        obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
-       lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
+       lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
        obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
 
        bufs[0] = (char *)&osi->osi_obdo;
@@ -545,7 +545,7 @@ static int osp_md_declare_attr_set(const struct lu_env *env,
        LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
        obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
                     attr->la_valid);
-       lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
+       lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
        obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
 
        buf = (char *)&osi->osi_obdo;
index 073bd4b..18e11cb 100644 (file)
@@ -1219,7 +1219,7 @@ int osp_object_truncate(const struct lu_env *env, struct dt_object *dt,
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        LASSERT(body);
-       lustre_set_wire_obdo(&body->oa, oa);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 
        /* XXX: capa support? */
        /* osc_pack_capa(req, body, capa); */
index 3f7e207..81003e7 100644 (file)
@@ -103,25 +103,50 @@ static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
  * Validate oa from client.
  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
  * req are valid.
- *    a. for single MDS  seq = FID_SEQ_OST_MDT0,
- *    b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX
+ *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
+ *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
+ *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
+ *       whether this is oi_fid or real ostid. So it will check
+ *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
+ *    c. Old FID-disable osc will send IDIF.
+ *    d. new FID-enable osc/osp will send normal FID.
+ *
+ * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
+ * be used for LAST_ID file, and only being accessed inside OST now.
  */
 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
                             struct obd_ioobj *ioobj)
 {
-       if (unlikely(oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP))) {
-               ostid_set_seq_mdt0(&oa->o_oi);
-               if (ioobj)
-                       ostid_set_seq_mdt0(&ioobj->ioo_oid);
-       } else if (unlikely(oa == NULL ||
-                           !(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
-                             fid_seq_is_mdt(ostid_seq(&oa->o_oi)) ||
-                             fid_seq_is_echo(ostid_seq(&oa->o_oi))))) {
-               CERROR("%s: client %s sent bad object "DOSTID": rc = -EPROTO\n",
-                      exp->exp_obd->obd_name, obd_export_nid2str(exp),
-                      oa ? ostid_seq(&oa->o_oi) : -1,
-                      oa ? ostid_id(&oa->o_oi) : -1);
-               return -EPROTO;
+       int rc = 0;
+
+       if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
+                    fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
+               /* Sigh 2.[123] client still sends echo req with oi_id = 0
+                * during create, and we will reset this to 1, since this
+                * oi_id is basically useless in the following create process,
+                * but oi_id == 0 will make it difficult to tell whether it is
+                * real FID or ost_id. */
+               oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
+               oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
+               oa->o_oi.oi_fid.f_ver = 0;
+       } else {
+               if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
+                       GOTO(out, rc = -EPROTO);
+
+               /* Note: this check might be forced in 2.5 or 2.6, i.e.
+                * all of the requests are required to setup FLGROUP */
+               if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
+                       ostid_set_seq_mdt0(&oa->o_oi);
+                       if (ioobj)
+                               ostid_set_seq_mdt0(&ioobj->ioo_oid);
+                       oa->o_valid |= OBD_MD_FLGROUP;
+               }
+
+               if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
+                       GOTO(out, rc = -EPROTO);
        }
 
        if (ioobj != NULL) {
@@ -132,11 +157,18 @@ static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
                               ": rc = -EPROTO\n", exp->exp_obd->obd_name,
                               obd_export_nid2str(exp), max_brw,
                               POSTID(&oa->o_oi));
-                       return -EPROTO;
+                       GOTO(out, rc = -EPROTO);
                }
                ioobj->ioo_oid = oa->o_oi;
        }
-       return 0;
+
+out:
+       if (rc != 0)
+               CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
+                      exp->exp_obd->obd_name, obd_export_nid2str(exp),
+                      oa ? ostid_seq(&oa->o_oi) : -1,
+                      oa ? ostid_id(&oa->o_oi) : -1, rc);
+       return rc;
 }
 
 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)