Whamcloud - gitweb
- continue with fixing defects after cmm codeinsp
authortappro <tappro>
Fri, 23 Jun 2006 10:07:41 +0000 (10:07 +0000)
committertappro <tappro>
Fri, 23 Jun 2006 10:07:41 +0000 (10:07 +0000)
- add forgotten opcode_offset() calculations for SEQ

lustre/cmm/cmm_device.c
lustre/cmm/cmm_object.c
lustre/cmm/mdc_device.c
lustre/cmm/mdc_internal.h
lustre/cmm/mdc_object.c
lustre/include/md_object.h
lustre/ptlrpc/ptlrpc_internal.h

index fe9e4c1..c96f119 100644 (file)
@@ -89,13 +89,19 @@ static int cmm_add_mdc(const struct lu_context *ctx,
         struct  lu_device_type *ldt = &mdc_device_type;
         struct  lu_device *ld;
         struct  mdc_device *mc, *tmp;
+        char *p, *num = lustre_cfg_string(cfg, 2);
         mdsno_t mdc_num;
         int rc;
         ENTRY;
 
         /* find out that there is no such mdc */
-        LASSERT(lustre_cfg_string(cfg, 2));
-        mdc_num = simple_strtol(lustre_cfg_string(cfg, 2), NULL, 10);
+        LASSERT(num);
+        mdc_num = simple_strtol(num, &p, 10);
+        if (*p) {
+                CERROR("Invalid index in lustre_cgf, offset 2\n");
+                RETURN(-EINVAL);
+        }
+
         spin_lock(&cm->cmm_tgt_guard);
         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
                                  mc_linkage) {
@@ -153,9 +159,13 @@ static int cmm_process_config(const struct lu_context *ctx,
                 break;
         case LCFG_SETUP:
         {
-                const char *index = lustre_cfg_string(cfg, 2);
+                const char *index = lustre_cfg_string(cfg, 2), *p;
                 LASSERT(index);
-                m->cmm_local_num = simple_strtol(index, NULL, 10);
+                m->cmm_local_num = simple_strtol(index, &p, 10);
+                if (*p) {
+                        CERROR("Invalid index in lustre_cgf, offset 2\n");
+                        RETURN(-EINVAL);
+                }
                 /* no break; to pass cfg further */
         }
         default:
@@ -244,6 +254,7 @@ static struct lu_device *cmm_device_fini(const struct lu_context *ctx,
 
         fld_client_fini(&cm->cmm_fld);
         /* finish all mdc devices */
+        spin_lock(&cm->cmm_tgt_guard);
         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
                 struct lu_device *ld_m = mdc2lu_dev(mc);
 
@@ -253,6 +264,7 @@ static struct lu_device *cmm_device_fini(const struct lu_context *ctx,
                 ld->ld_type->ldt_ops->ldto_device_free(ctx, ld_m);
                 cm->cmm_tgt_count--;
         }
+        spin_unlock(&cm->cmm_tgt_guard);
 
         RETURN (md2lu_dev(cm->cmm_child));
 }
index a547404..a425d4f 100644 (file)
@@ -65,7 +65,7 @@ static int cmm_fld_lookup(struct cmm_device *cm,
                 }
         }
 
-        if (*mds >= cm->cmm_tgt_count) {
+        if (*mds > cm->cmm_tgt_count) {
                 CERROR("Got invalid mdsno: %u (max: %u)\n",
                        *mds, cm->cmm_tgt_count);
                 rc = -EINVAL;
@@ -228,7 +228,7 @@ static int cml_object_create(const struct lu_context *ctx,
 {
         int rc;
         ENTRY;
-        rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+        rc = mo_object_create(ctx, md_object_next(mo), attr);
         RETURN(rc);
 }
 
@@ -237,7 +237,7 @@ static int cml_attr_get(const struct lu_context *ctx, struct md_object *mo,
 {
         int rc;
         ENTRY;
-        rc = mo_attr_get(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+        rc = mo_attr_get(ctx, md_object_next(mo), attr);
         RETURN(rc);
 }
 
@@ -246,7 +246,7 @@ static int cml_attr_set(const struct lu_context *ctx, struct md_object *mo,
 {
         int rc;
         ENTRY;
-        rc = mo_attr_set(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+        rc = mo_attr_set(ctx, md_object_next(mo), attr);
         RETURN(rc);
 }
 
@@ -255,7 +255,7 @@ static int cml_xattr_get(const struct lu_context *ctx, struct md_object *mo,
 {
         int rc;
         ENTRY;
-        rc = mo_xattr_get(ctx, cmm2child_obj(md2cmm_obj(mo)),
+        rc = mo_xattr_get(ctx, md_object_next(mo),
                          buf, buflen, name);
         RETURN(rc);
 }
@@ -265,7 +265,7 @@ static int cml_xattr_set(const struct lu_context *ctx, struct md_object *mo,
 {
         int rc;
         ENTRY;
-        rc = mo_xattr_set(ctx, cmm2child_obj(md2cmm_obj(mo)),
+        rc = mo_xattr_set(ctx, md_object_next(mo),
                           buf, buflen, name);
         RETURN(rc);
 }
@@ -274,7 +274,7 @@ static int cml_ref_add(const struct lu_context *ctx, struct md_object *mo)
 {
         int rc;
         ENTRY;
-        rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo)));
+        rc = mo_ref_add(ctx, md_object_next(mo));
         RETURN(rc);
 }
 
@@ -282,7 +282,7 @@ static int cml_ref_del(const struct lu_context *ctx, struct md_object *mo)
 {
         int rc;
         ENTRY;
-        rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo)));
+        rc = mo_ref_del(ctx, md_object_next(mo));
         RETURN(rc);
 }
 
@@ -290,7 +290,7 @@ static int cml_open(const struct lu_context *ctx, struct md_object *mo)
 {
         int rc;
         ENTRY;
-        rc = mo_open(ctx, cmm2child_obj(md2cmm_obj(mo)));
+        rc = mo_open(ctx, md_object_next(mo));
         RETURN(rc);
 }
 
@@ -298,7 +298,7 @@ static int cml_close(const struct lu_context *ctx, struct md_object *mo)
 {
         int rc;
         ENTRY;
-        rc = mo_close(ctx, cmm2child_obj(md2cmm_obj(mo)));
+        rc = mo_close(ctx, md_object_next(mo));
         RETURN(rc);
 }
 
@@ -320,7 +320,7 @@ static int cml_lookup(const struct lu_context *ctx, struct md_object *mo_p,
 {
         int rc;
         ENTRY;
-        rc = mdo_lookup(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name, lf);
+        rc = mdo_lookup(ctx, md_object_next(mo_p), name, lf);
         RETURN(rc);
 
 }
@@ -331,8 +331,8 @@ static int cml_create(const struct lu_context *ctx,
 {
         int rc;
         ENTRY;
-        rc = mdo_create(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name,
-                        cmm2child_obj(md2cmm_obj(mo_c)), attr);
+        rc = mdo_create(ctx, md_object_next(mo_p), name,
+                        md_object_next(mo_c), attr);
         RETURN(rc);
 }
 
@@ -341,8 +341,8 @@ static int cml_link(const struct lu_context *ctx, struct md_object *mo_p,
 {
         int rc;
         ENTRY;
-        rc = mdo_link(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
-                      cmm2child_obj(md2cmm_obj(mo_s)), name);
+        rc = mdo_link(ctx, md_object_next(mo_p),
+                      md_object_next(mo_s), name);
         RETURN(rc);
 }
 
@@ -351,11 +351,12 @@ static int cml_unlink(const struct lu_context *ctx, struct md_object *mo_p,
 {
         int rc;
         ENTRY;
-        rc = mdo_unlink(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
-                        cmm2child_obj(md2cmm_obj(mo_c)), name);
+        rc = mdo_unlink(ctx, md_object_next(mo_p),
+                        md_object_next(mo_c), name);
         RETURN(rc);
 }
 
+/* rename is split to local/remote by location of new parent dir */
 static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
                        struct md_object *mo_pn, const struct lu_fid *lf,
                        const char *s_name, struct md_object *mo_t,
@@ -365,16 +366,16 @@ static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
         ENTRY;
 
         if (mo_t && !cmm_is_local_obj(md2cmm_obj(mo_t))) {
-                /* remote object */
-                rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_t)));
+                /* mo_t is remote object and there is RPC to unlink it */
+                rc = mo_ref_del(ctx, md_object_next(mo_t));
                 if (rc)
                         RETURN(rc);
                 mo_t = NULL;
         }
-
-        rc = mdo_rename(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
-                        cmm2child_obj(md2cmm_obj(mo_pn)), lf, s_name,
-                        cmm2child_obj(md2cmm_obj(mo_t)), t_name);
+        /* local rename, mo_t can be NULL */
+        rc = mdo_rename(ctx, md_object_next(mo_po),
+                        md_object_next(mo_pn), lf, s_name,
+                        md_object_next(mo_t), t_name);
 
         RETURN(rc);
 }
@@ -386,8 +387,8 @@ static int cml_rename_tgt(const struct lu_context *ctx,
         int rc;
         ENTRY;
 
-        rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
-                            cmm2child_obj(md2cmm_obj(mo_t)), lf, name);
+        rc = mdo_rename_tgt(ctx, md_object_next(mo_p),
+                            md_object_next(mo_t), lf, name);
         RETURN(rc);
 }
 
@@ -561,6 +562,16 @@ static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p,
         RETURN(-EREMOTE);
 }
 
+/*
+ * All methods below are cross-ref by nature. They consist of remote call and
+ * local operation. Due to future rollback functionality there are several
+ * limitations for such methods:
+ * 1) remote call should be done at first to do epoch negotiation between all
+ * MDS involved and to avoid the RPC inside transaction.
+ * 2) only one RPC can be sent - also due to epoch negotiation.
+ * For more details see rollback HLD/DLD.
+ *
+ */
 static int cmr_create(const struct lu_context *ctx,
                       struct md_object *mo_p, const char *name,
                       struct md_object *mo_c, struct lu_attr *attr)
@@ -572,9 +583,9 @@ static int cmr_create(const struct lu_context *ctx,
         //XXX: make sure that MDT checks name isn't exist
 
         /* remote object creation and local name insert */
-        rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo_c)), attr);
+        rc = mo_object_create(ctx, md_object_next(mo_c), attr);
         if (rc == 0) {
-                rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                rc = mdo_name_insert(ctx, md_object_next(mo_p),
                                      name, lu_object_fid(&mo_c->mo_lu));
         }
 
@@ -589,9 +600,9 @@ static int cmr_link(const struct lu_context *ctx, struct md_object *mo_p,
 
         //XXX: make sure that MDT checks name isn't exist
 
-        rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo_s)));
+        rc = mo_ref_add(ctx, md_object_next(mo_s));
         if (rc == 0) {
-                rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                rc = mdo_name_insert(ctx, md_object_next(mo_p),
                                      name, lu_object_fid(&mo_s->mo_lu));
         }
 
@@ -604,9 +615,9 @@ static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p,
         int rc;
         ENTRY;
 
-        rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
+        rc = mo_ref_del(ctx, md_object_next(mo_c));
         if (rc == 0) {
-                rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                rc = mdo_name_remove(ctx, md_object_next(mo_p),
                                      name);
         }
 
@@ -626,16 +637,18 @@ static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po,
          * lookup and process this further */
 
         LASSERT(mo_t == NULL);
-        rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_pn)),
+        rc = mdo_rename_tgt(ctx, md_object_next(mo_pn),
                             NULL/* mo_t */, lf, t_name);
         /* only old name is removed localy */
         if (rc == 0)
-                rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+                rc = mdo_name_remove(ctx, md_object_next(mo_po),
                                      s_name);
 
         RETURN(rc);
 }
 
+/* part of cross-ref rename(). Used to insert new name in new parent 
+ * and unlink target with same name if it exists */
 static int cmr_rename_tgt(const struct lu_context *ctx,
                           struct md_object *mo_p, struct md_object *mo_t,
                           const struct lu_fid *lf, const char *name)
@@ -643,10 +656,10 @@ static int cmr_rename_tgt(const struct lu_context *ctx,
         int rc;
         ENTRY;
         /* target object is remote one */
-        rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_t)));
+        rc = mo_ref_del(ctx, md_object_next(mo_t));
         /* continue locally with name handling only */
         if (rc == 0)
-                rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                rc = mdo_rename_tgt(ctx, md_object_next(mo_p),
                                     NULL, lf, name);
         RETURN(rc);
 }
index 4ba8e3d..feedb68 100644 (file)
@@ -50,25 +50,38 @@ static inline int lu_device_is_mdc(struct lu_device *ld)
 
 static struct md_device_operations mdc_md_ops = { 0 };
 
+/* MDC OBD is set up already and connected to the proper MDS
+ * mdc_add_obd() find that obd by uuid and connects to it.
+ * Local MDT uuid is used for connection
+ * */
 static int mdc_add_obd(struct mdc_device *mc, struct lustre_cfg *cfg)
 {
         struct mdc_cli_desc *desc = &mc->mc_desc;
         struct obd_device *mdc, *mdt;
         const char *srv = lustre_cfg_string(cfg, 0);
         const char *uuid_str = lustre_cfg_string(cfg, 1);
-        const char *index = lustre_cfg_string(cfg, 2);
+        const char *index = lustre_cfg_string(cfg, 2), *p;
         int rc = 0;
 
         ENTRY;
+        LASSERT(uuid_str);
+        LASSERT(index);
+        
+        mc->mc_num = simple_strtol(index, &p, 10);
+        if (*p) {
+                CERROR("Invalid index in lustre_cgf, offset 2\n");                
+                RETURN(-EINVAL);
+        }
 
-        //find mdt obd to get group uuid
+        /* find local MDT obd to get group uuid */
         mdt = class_name2obd(srv);
         if (mdt == NULL) {
                 CERROR("No such OBD %s\n", srv);
                 LBUG();
         }
-        obd_str2uuid(&desc->cl_cli_uuid, uuid_str);
-        mdc = class_find_client_obd(&desc->cl_cli_uuid, LUSTRE_MDC_NAME,
+        obd_str2uuid(&desc->cl_srv_uuid, uuid_str);
+        /* try to find MDC OBD connected to the needed MDT */
+        mdc = class_find_client_obd(&desc->cl_srv_uuid, LUSTRE_MDC_NAME,
                                     &mdt->obd_uuid);
         if (!mdc) {
                 CERROR("Cannot find MDC OBD connected to %s\n", uuid_str);
@@ -77,19 +90,18 @@ static int mdc_add_obd(struct mdc_device *mc, struct lustre_cfg *cfg)
                 CERROR("target %s not set up\n", mdc->obd_name);
                 rc = -EINVAL;
         } else {
-                struct lustre_handle conn = {0, };
+                struct lustre_handle *conn = &desc->cl_conn;
 
                 CDEBUG(D_CONFIG, "connect to %s(%s)\n",
                        mdc->obd_name, mdc->obd_uuid.uuid);
 
-                rc = obd_connect(&conn, mdc, &mdt->obd_uuid, NULL);
+                rc = obd_connect(conn, mdc, &mdt->obd_uuid, NULL);
 
                 if (rc) {
                         CERROR("target %s connect error %d\n",
                                mdc->obd_name, rc);
                 } else {
-                        desc->cl_exp = class_conn2export(&conn);
-                        mc->mc_num = simple_strtol(index, NULL, 10);
+                        desc->cl_exp = class_conn2export(conn);
                 }
         }
 
@@ -103,8 +115,8 @@ static int mdc_del_obd(struct mdc_device *mc)
 
         ENTRY;
 
-        CDEBUG(D_CONFIG, "disconnect from %s(%s)\n",
-               class_exp2obd(desc->cl_exp)->obd_name, desc->cl_cli_uuid.uuid);
+        CDEBUG(D_CONFIG, "disconnect from %s\n",
+               class_exp2obd(desc->cl_exp)->obd_name);
 
         rc = obd_disconnect(desc->cl_exp);
         if (rc) {
index 624d857..50e884a 100644 (file)
 #include <obd.h>
 #include <md_object.h>
 struct mdc_cli_desc {
-        struct obd_connect_data  cl_conn_data;
-        struct obd_uuid          cl_cli_uuid;
+        struct lustre_handle     cl_conn;
+        /* uuid of remote MDT to connect */
+        struct obd_uuid          cl_srv_uuid;
+        /* export of mdc obd */
         struct obd_export        *cl_exp;
 };
 
index 36c074c..54319f8 100644 (file)
@@ -108,11 +108,10 @@ static int mdc_object_create(const struct lu_context *ctx,
 
         mci = lu_context_key_get(ctx, &mdc_thread_key);
         LASSERT(mci);
-
+        
+        memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
         mci->mci_opdata.fid1 = *lu_object_fid(&mo->mo_lu);
         mci->mci_opdata.mod_time = attr->la_mtime;
-        mci->mci_opdata.name = NULL;
-        mci->mci_opdata.namelen = 0;
 
         rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
                        attr->la_mode, attr->la_uid, attr->la_gid, 0, 0,
@@ -133,9 +132,8 @@ static int mdc_ref_add(const struct lu_context *ctx, struct md_object *mo)
         mci = lu_context_key_get(ctx, &mdc_thread_key);
         LASSERT(mci);
 
+        memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
         mci->mci_opdata.fid1 = *lu_object_fid(&mo->mo_lu);
-        mci->mci_opdata.name = NULL;
-        mci->mci_opdata.namelen = 0;
 
         rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
 
@@ -154,9 +152,8 @@ static int mdc_ref_del(const struct lu_context *ctx, struct md_object *mo)
         mci = lu_context_key_get(ctx, &mdc_thread_key);
         LASSERT(mci);
 
+        memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
         mci->mci_opdata.fid1 = *lu_object_fid(&mo->mo_lu);
-        mci->mci_opdata.name = NULL;
-        mci->mci_opdata.namelen = 0;
 
         rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
 
@@ -184,6 +181,7 @@ static int mdc_rename_tgt(const struct lu_context *ctx,
         mci = lu_context_key_get(ctx, &mdc_thread_key);
         LASSERT(mci);
 
+        memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
         mci->mci_opdata.fid1 = *lu_object_fid(&mo_p->mo_lu);
         mci->mci_opdata.fid2 = *lf;
 
index 65609e9..78f29b6 100644 (file)
@@ -146,7 +146,7 @@ static inline struct md_object *lu2md(const struct lu_object *o)
 
 static inline struct md_object *md_object_next(const struct md_object *obj)
 {
-        return lu2md(lu_object_next(&obj->mo_lu));
+        return (obj ? lu2md(lu_object_next(&obj->mo_lu)) : NULL);
 }
 
 static inline struct md_device *md_obj2dev(const struct md_object *o)
index b7da6dd..7497e2e 100644 (file)
@@ -88,6 +88,14 @@ static inline int opcode_offset(__u32 opc) {
                         (LDLM_LAST_OPC - LDLM_FIRST_OPC) +
                         (MDS_LAST_OPC - MDS_FIRST_OPC) +
                         (OST_LAST_OPC - OST_FIRST_OPC));
+        } else if (opc < SEQ_LAST_OPC) {
+                /* SEQ opcode */
+                return (opc - SEQ_FIRST_OPC +
+                        (OBD_LAST_OPC - OBD_FIRST_OPC) +
+                        (LDLM_LAST_OPC - LDLM_FIRST_OPC) +
+                        (MDS_LAST_OPC - MDS_FIRST_OPC) +
+                        (OST_LAST_OPC - OST_FIRST_OPC)) +
+                        (FLD_LAST_OPC - FLD_FIRST_OPC);
         } else {
                 /* Unknown Opcode */
                 return -1;