Whamcloud - gitweb
LU-1244 obdecho: destroy ost objects for md echo client.
authorwangdi <di.wang@whamcloud.com>
Wed, 21 Mar 2012 00:27:55 +0000 (17:27 -0700)
committerOleg Drokin <green@whamcloud.com>
Fri, 6 Apr 2012 03:13:28 +0000 (23:13 -0400)
Since md echo client connects MDT directly, and there are no
lov for echo client, so it will destroy the ost object in
mdd_unlink directly.

In setxattr test, it should set another EA, instead of LOV,
which would cause problem in the following test.

Reset valid before unlink and getattr.

Signed-off-by: Di Wang <di.wang@whamcloud.com>
Change-Id: Id3da42fac4af944ceef7db34f13e8f2b40f733b6
Reviewed-on: http://review.whamcloud.com/2356
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre/lustre_idl.h
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lov.c
lustre/obdecho/echo_client.c
lustre/utils/obd.c

index 29af57d..bc57906 100644 (file)
@@ -1943,6 +1943,7 @@ enum {
         MDS_CLOSE_CLEANUP = 1 << 6,
         MDS_KEEP_ORPHAN   = 1 << 7,
         MDS_RECOV_OPEN    = 1 << 8,
+        MDS_UNLINK_DESTROY = 1 << 9,  /* Destory ost object in mdd_unlink */
 };
 
 /* instance of mdt_reint_rec */
index 21cd018..15be75a 100644 (file)
@@ -1086,6 +1086,18 @@ out_trans:
 
 stop:
         mdd_trans_stop(env, mdd, rc, handle);
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,3,50,0)
+        if (rc == 0 && ma->ma_valid & MA_COOKIE && ma->ma_valid & MA_LOV &&
+            ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_UNLINK_DESTROY)
+                /* Since echo client is incapable of destorying ost object,
+                 * it will destory the object here. */
+                rc = mdd_lovobj_unlink(env, mdd, mdd_cobj, la,
+                                       ma->ma_lmm, ma->ma_lmm_size,
+                                       ma->ma_cookie, 1);
+#else
+#warning "please remove this after 2.4 (LOD/OSP)."
+#endif
+
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc)
                 /* Trigger dqrel on the owner of child and parent. If failed,
index fb9bf4b..4d312b7 100644 (file)
@@ -318,6 +318,11 @@ int mdd_get_cookie_size(const struct lu_env *env, struct mdd_device *mdd,
 int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
                           struct lov_mds_md *lmm, int lmm_size,
                           struct llog_cookie *logcookies);
+int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
+                      struct mdd_object *obj, struct lu_attr *la,
+                      struct lov_mds_md *lmm, int lmm_size,
+                      struct llog_cookie *logcookies,
+                      int log_unlink);
 
 struct mdd_thread_info *mdd_env_info(const struct lu_env *env);
 
index be39bcf..56691e1 100644 (file)
@@ -595,7 +595,6 @@ out_ids:
  * used when destroying orphans and from mds_reint_unlink() when MDS wants to
  * destroy objects on OSS.
  */
-static
 int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
                       struct mdd_object *obj, struct lu_attr *la,
                       struct lov_mds_md *lmm, int lmm_size,
@@ -644,7 +643,7 @@ int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
 }
 
 /*
- * called with obj locked. 
+ * called with obj locked.
  */
 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
                     struct mdd_object *obj, struct lu_attr *la)
index ea9f293..51aff35 100644 (file)
@@ -209,6 +209,7 @@ struct echo_thread_info {
         struct md_attr          eti_ma;
         struct lu_name          eti_lname;
         char                    eti_name[20];
+        struct lu_buf           eti_buf;
         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
 };
 
@@ -1460,36 +1461,35 @@ out_put:
 
 static int echo_set_lmm_size(const struct lu_env *env,
                              struct lu_device *ld,
-                             struct md_attr *ma,
-                             int *max_lmm_size)
+                             struct md_attr *ma)
 {
-        struct echo_thread_info *info = echo_env_info(env);
         struct md_device *md = lu2md_dev(ld);
-        int tmp, rc;
+        int lmm_size, cookie_size, rc;
         ENTRY;
 
-        LASSERT(max_lmm_size != NULL);
-        if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
-                ma->ma_lmm = (void *)&info->eti_lmm;
-                ma->ma_lmm_size = sizeof(info->eti_lmm);
-                *max_lmm_size = 0;
-                RETURN(0);
-        }
-
         md = lu2md_dev(ld);
         rc = md->md_ops->mdo_maxsize_get(env, md,
-                                         max_lmm_size, &tmp);
+                                         &lmm_size, &cookie_size);
         if (rc)
                 RETURN(rc);
 
-        if (*max_lmm_size == 0)
-                /* In case xattr is set in echo_setattr_object */
-                *max_lmm_size = sizeof(struct lov_user_md_v3);
+        ma->ma_lmm_size = lmm_size;
+        if (lmm_size > 0) {
+                OBD_ALLOC(ma->ma_lmm, lmm_size);
+                if (ma->ma_lmm == NULL) {
+                        ma->ma_lmm_size = 0;
+                        RETURN(-ENOMEM);
+                }
+        }
 
-        ma->ma_lmm_size = *max_lmm_size;
-        OBD_ALLOC(ma->ma_lmm, ma->ma_lmm_size);
-        if (ma->ma_lmm == NULL)
-                RETURN(-ENOMEM);
+        ma->ma_cookie_size = cookie_size;
+        if (cookie_size > 0) {
+                OBD_ALLOC(ma->ma_cookie, cookie_size);
+                if (ma->ma_cookie == NULL) {
+                        ma->ma_cookie_size = 0;
+                        RETURN(-ENOMEM);
+                }
+        }
 
         RETURN(0);
 }
@@ -1509,7 +1509,6 @@ static int echo_create_md_object(const struct lu_env *env,
         struct md_attr          *ma = &info->eti_ma;
         struct lu_device        *ld = ed->ed_next;
         int                      rc = 0;
-        int                      max_lmm_size = 0;
         int                      i;
 
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
@@ -1522,7 +1521,7 @@ static int echo_create_md_object(const struct lu_env *env,
         memset(spec, 0, sizeof(*spec));
         if (stripe_count != 0) {
                 spec->sp_cr_flags |= FMODE_WRITE;
-                rc = echo_set_lmm_size(env, ld, ma, &max_lmm_size);
+                rc = echo_set_lmm_size(env, ld, ma);
                 if (rc)
                         GOTO(out_free, rc);
                 if (stripe_count != -1) {
@@ -1567,9 +1566,10 @@ static int echo_create_md_object(const struct lu_env *env,
         }
 
 out_free:
-        if (!strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME) &&
-             max_lmm_size > 0  && ma->ma_lmm != NULL)
-                OBD_FREE(ma->ma_lmm, max_lmm_size);
+        if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL)
+                OBD_FREE(ma->ma_lmm, ma->ma_lmm_size);
+        if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL)
+                OBD_FREE(ma->ma_cookie, ma->ma_cookie_size);
 
         return rc;
 }
@@ -1607,9 +1607,8 @@ static int echo_setattr_object(const struct lu_env *env,
         struct echo_thread_info *info = echo_env_info(env);
         struct lu_name          *lname = &info->eti_lname;
         char                    *name = info->eti_name;
-        struct md_attr          *ma = &info->eti_ma;
         struct lu_device        *ld = ed->ed_next;
-        struct lov_user_md_v3   *lum = &info->eti_lum;
+        struct lu_buf           *buf = &info->eti_buf;
         int                      rc = 0;
         int                      i;
 
@@ -1619,15 +1618,8 @@ static int echo_setattr_object(const struct lu_env *env,
                 return PTR_ERR(parent);
         }
 
-        memset(ma, 0, sizeof(*ma));
-        lum->lmm_magic = LOV_USER_MAGIC_V3;
-        lum->lmm_stripe_count = 1;
-        lum->lmm_stripe_offset = -1;
-        lum->lmm_pattern = 0;
-
-        ma->ma_lmm = (struct lov_mds_md *)lum;
-        ma->ma_lmm_size = sizeof(*lum);
-        ma->ma_valid = MA_LOV | MA_HSM;
+        buf->lb_buf = info->eti_xattr_buf;
+        buf->lb_len = sizeof(info->eti_xattr_buf);
         for (i = 0; i < count; i++) {
                 struct lu_object *ec_child, *child;
 
@@ -1648,16 +1640,19 @@ static int echo_setattr_object(const struct lu_env *env,
                         break;
                 }
 
-                CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
+                CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
                        PFID(lu_object_fid(child)));
-                rc = mo_attr_set(env, lu2md(child), ma);
+
+                sprintf(name, "%s.test1", XATTR_USER_PREFIX);
+                rc = mo_xattr_set(env, lu2md(child), buf, name,
+                                  LU_XATTR_CREATE);
                 if (rc) {
-                        CERROR("Can not getattr child "DFID": rc = %d\n",
+                        CERROR("Can not setattr child "DFID": rc = %d\n",
                                 PFID(lu_object_fid(child)), rc);
                         lu_object_put(env, ec_child);
                         break;
                 }
-                CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
+                CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
                        PFID(lu_object_fid(child)));
                 id++;
                 lu_object_put(env, ec_child);
@@ -1676,7 +1671,6 @@ static int echo_getattr_object(const struct lu_env *env,
         char                    *name = info->eti_name;
         struct md_attr          *ma = &info->eti_ma;
         struct lu_device        *ld = ed->ed_next;
-        int                      max_lmm_size;
         int                      rc = 0;
         int                      i;
 
@@ -1687,7 +1681,7 @@ static int echo_getattr_object(const struct lu_env *env,
         }
 
         memset(ma, 0, sizeof(*ma));
-        rc = echo_set_lmm_size(env, ld, ma, &max_lmm_size);
+        rc = echo_set_lmm_size(env, ld, ma);
         if (rc)
                 GOTO(out_free, rc);
 
@@ -1698,6 +1692,7 @@ static int echo_getattr_object(const struct lu_env *env,
         for (i = 0; i < count; i++) {
                 struct lu_object *ec_child, *child;
 
+                ma->ma_valid = 0;
                 echo_md_build_name(lname, name, id);
 
                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
@@ -1730,10 +1725,10 @@ static int echo_getattr_object(const struct lu_env *env,
         }
 
 out_free:
-        if (!strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME) &&
-             max_lmm_size > 0 && ma->ma_lmm)
-                OBD_FREE(ma->ma_lmm, max_lmm_size);
-
+        if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL)
+                OBD_FREE(ma->ma_lmm, ma->ma_lmm_size);
+        if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL)
+                OBD_FREE(ma->ma_cookie, ma->ma_cookie_size);
         return rc;
 }
 
@@ -1763,6 +1758,7 @@ static int echo_lookup_object(const struct lu_env *env,
 
                 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
+
                 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
                 if (rc) {
                         CERROR("Can not lookup child %s: rc = %d\n", name, rc);
@@ -1803,6 +1799,14 @@ static int echo_md_destroy_internal(const struct lu_env *env,
         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,3,50,0)
+        /* After 2.4, MDT will send destroy RPC to OST directly, so no need
+         * this flag */
+        ma->ma_valid |= MA_FLAGS;
+        ma->ma_attr_flags |= MDS_UNLINK_DESTROY;
+#else
+#warning "Please remove this after 2.4 (LOD/OSP)"
+#endif
         rc = mdo_unlink(env, parent, lu2md(child), lname, ma);
         if (rc) {
                 CERROR("Can not unlink child %s: rc = %d\n",
@@ -1829,7 +1833,6 @@ static int echo_destroy_object(const struct lu_env *env,
         struct lu_device        *ld = ed->ed_next;
         struct lu_object        *parent;
         int                      rc = 0;
-        int                      max_lmm_size = 0;
         int                      i;
         ENTRY;
 
@@ -1844,16 +1847,9 @@ static int echo_destroy_object(const struct lu_env *env,
         ma->ma_need = MA_INODE;
         ma->ma_valid = 0;
 
-        rc = echo_set_lmm_size(env, ld, ma, &max_lmm_size);
+        rc = echo_set_lmm_size(env, ld, ma);
         if (rc)
                 GOTO(out_free, rc);
-
-        /*FIXME: Do not need logcookie for now, and check stripes*/
-        ma->ma_cookie = NULL;
-        ma->ma_cookie_size = 0;
-        ma->ma_need = MA_INODE | MA_LOV;
-        ma->ma_valid = 0;
-
         if (name != NULL) {
                 lname->ln_name = name;
                 lname->ln_namelen = namelen;
@@ -1866,6 +1862,8 @@ static int echo_destroy_object(const struct lu_env *env,
         for (i = 0; i < count; i++) {
                 char *tmp_name = info->eti_name;
 
+                ma->ma_need |= MA_LOV;
+                ma->ma_valid = 0;
                 echo_md_build_name(lname, tmp_name, id);
 
                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
@@ -1878,10 +1876,10 @@ static int echo_destroy_object(const struct lu_env *env,
         }
 
 out_free:
-        if (!strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME) &&
-             max_lmm_size > 0 && ma->ma_lmm)
-                OBD_FREE(ma->ma_lmm, max_lmm_size);
-
+        if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL)
+                OBD_FREE(ma->ma_lmm, ma->ma_lmm_size);
+        if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL)
+                OBD_FREE(ma->ma_cookie, ma->ma_cookie_size);
         RETURN(rc);
 }
 
@@ -1900,8 +1898,6 @@ struct lu_object *echo_resolve_path(const struct lu_env *env,
         ENTRY;
 
         /*Only support MDD layer right now*/
-        LASSERT(!strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME));
-
         rc = md->md_ops->mdo_root_get(env, md, fid);
         if (rc) {
                 CERROR("get root error: rc = %d\n", rc);
index 880a0db..3d88f72 100644 (file)
@@ -1435,8 +1435,6 @@ int jt_obd_md_common(int argc, char **argv, int cmd)
                         data.ioc_count = MD_STEP_COUNT;
                 }
 
-                child_base_id += data.ioc_count;
-                count += data.ioc_count;
                 if (cmd == ECHO_MD_CREATE || cmd == ECHO_MD_MKDIR) {
                         /*Allocate fids for the create */
                         rc = jt_obd_alloc_fids(&fid_space, &fid,
@@ -1448,6 +1446,10 @@ int jt_obd_md_common(int argc, char **argv, int cmd)
                         data.ioc_obdo1.o_seq = fid.f_seq;
                         data.ioc_obdo1.o_id = fid.f_oid;
                 }
+
+                child_base_id += data.ioc_count;
+                count += data.ioc_count;
+
                 memset(buf, 0, sizeof(rawbuf));
                 rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
                 if (rc) {