Whamcloud - gitweb
LU-14521 flr: delete mirror without volatile file
[fs/lustre-release.git] / lustre / lod / lod_object.c
index caa32fb..e861921 100644 (file)
@@ -3422,6 +3422,186 @@ static int lod_declare_layout_split(const struct lu_env *env,
        RETURN(rc);
 }
 
+static int lod_layout_declare_or_purge_mirror(const struct lu_env *env,
+                       struct dt_object *dt, const struct lu_buf *buf,
+                       struct thandle *th, bool declare)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
+       struct lov_comp_md_entry_v1 *entry;
+       struct lov_mds_md_v1 *lmm;
+       struct dt_object **sub_objs = NULL;
+       int rc = 0, i, k, array_count = 0;
+
+       ENTRY;
+
+       if (!declare) {
+               /* prepare sub-objects array */
+               for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+                       entry = &comp_v1->lcm_entries[i];
+
+                       if (!(entry->lcme_flags & LCME_FL_INIT))
+                               continue;
+
+                       lmm = (struct lov_mds_md_v1 *)
+                                       ((char *)comp_v1 + entry->lcme_offset);
+                       array_count += lmm->lmm_stripe_count;
+               }
+               OBD_ALLOC_PTR_ARRAY(sub_objs, array_count);
+               if (sub_objs == NULL)
+                       RETURN(-ENOMEM);
+       }
+
+       k = 0;  /* sub_objs index */
+       for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+               struct lov_ost_data_v1 *objs;
+               struct lu_object *o, *n;
+               struct dt_object *dto;
+               struct lu_device *nd;
+               struct lov_mds_md_v3 *v3;
+               __u32 idx;
+               int j;
+
+               entry = &comp_v1->lcm_entries[i];
+
+               if (!(entry->lcme_flags & LCME_FL_INIT))
+                       continue;
+
+               lmm = (struct lov_mds_md_v1 *)
+                               ((char *)comp_v1 + entry->lcme_offset);
+               v3 = (struct lov_mds_md_v3 *)lmm;
+               if (lmm->lmm_magic == LOV_MAGIC_V3)
+                       objs = &v3->lmm_objects[0];
+               else
+                       objs = &lmm->lmm_objects[0];
+
+               for (j = 0; j < lmm->lmm_stripe_count; j++) {
+                       idx = objs[j].l_ost_idx;
+                       rc = ostid_to_fid(&info->lti_fid, &objs[j].l_ost_oi,
+                                         idx);
+                       if (rc)
+                               GOTO(out, rc);
+
+                       if (!fid_is_sane(&info->lti_fid)) {
+                               CERROR("%s: sub-object insane fid "DFID"\n",
+                                      lod2obd(d)->obd_name,
+                                      PFID(&info->lti_fid));
+                               GOTO(out, rc = -EINVAL);
+                       }
+
+                       lod_getref(&d->lod_ost_descs);
+
+                       rc = validate_lod_and_idx(d, idx);
+                       if (unlikely(rc)) {
+                               lod_putref(d, &d->lod_ost_descs);
+                               GOTO(out, rc);
+                       }
+
+                       nd = &OST_TGT(d, idx)->ltd_tgt->dd_lu_dev;
+                       lod_putref(d, &d->lod_ost_descs);
+
+                       o = lu_object_find_at(env, nd, &info->lti_fid, NULL);
+                       if (IS_ERR(o))
+                               GOTO(out, rc = PTR_ERR(o));
+
+                       n = lu_object_locate(o->lo_header, nd->ld_type);
+                       if (unlikely(!n)) {
+                               lu_object_put(env, n);
+                               GOTO(out, rc = -ENOENT);
+                       }
+
+                       dto = container_of(n, struct dt_object, do_lu);
+
+                       if (declare) {
+                               rc = lod_sub_declare_destroy(env, dto, th);
+                               dt_object_put(env, dto);
+                               if (rc)
+                                       GOTO(out, rc);
+                       } else {
+                               /**
+                                * collect to-be-destroyed sub objects, the
+                                * reference would be released after actual
+                                * deletion.
+                                */
+                               sub_objs[k] = dto;
+                               k++;
+                       }
+               } /* for each stripe */
+       } /* for each component in the mirror */
+out:
+       if (!declare) {
+               i = 0;
+               if (!rc) {
+                       /* destroy the sub objects */
+                       for (; i < k; i++) {
+                               rc = lod_sub_destroy(env, sub_objs[i], th);
+                               if (rc)
+                                       break;
+                               dt_object_put(env, sub_objs[i]);
+                       }
+               }
+               /**
+                * if a sub object destroy failed, we'd release sub objects
+                * reference get from above sub_objs collection.
+                */
+               for (; i < k; i++)
+                       dt_object_put(env, sub_objs[i]);
+
+               OBD_FREE_PTR_ARRAY(sub_objs, array_count);
+       }
+
+       RETURN(rc);
+}
+
+/**
+ * Purge layouts, delete sub objects in the mirror stored in the vic_buf,
+ * and set the LOVEA with the layout from mbuf.
+ */
+static int lod_declare_layout_purge(const struct lu_env *env,
+               struct dt_object *dt, const struct lu_buf *buf,
+               struct thandle *th)
+{
+       struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
+       int rc;
+
+       ENTRY;
+
+       if (le32_to_cpu(comp_v1->lcm_magic) != LOV_MAGIC_COMP_V1) {
+               CERROR("%s: invalid layout magic %#x != %#x\n",
+                      lod2obd(d)->obd_name, le32_to_cpu(comp_v1->lcm_magic),
+                      LOV_MAGIC_COMP_V1);
+               RETURN(-EINVAL);
+       }
+
+       if (cpu_to_le32(LOV_MAGIC_COMP_V1) != LOV_MAGIC_COMP_V1)
+               lustre_swab_lov_comp_md_v1(comp_v1);
+
+       /* from now on, @buf contains cpu endian data */
+
+       if (comp_v1->lcm_mirror_count != 0) {
+               CERROR("%s: can only purge one mirror from "DFID"\n",
+                      lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu)));
+               RETURN(-EINVAL);
+       }
+
+       /* delcare sub objects deletion in the mirror stored in @buf */
+       rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, true);
+       RETURN(rc);
+}
+
+/* delete sub objects from the mirror stored in @buf */
+static int lod_layout_purge(const struct lu_env *env, struct dt_object *dt,
+                           const struct lu_buf *buf, struct thandle *th)
+{
+       int rc;
+
+       ENTRY;
+       rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, false);
+       RETURN(rc);
+}
+
 /**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
@@ -3446,7 +3626,8 @@ static int lod_declare_xattr_set(const struct lu_env *env,
 
        mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
        if ((S_ISREG(mode) || mode == 0) &&
-           !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) &&
+           !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT |
+                   LU_XATTR_PURGE)) &&
            (strcmp(name, XATTR_NAME_LOV) == 0 ||
             strcmp(name, XATTR_LUSTRE_LOV) == 0)) {
                /*
@@ -3476,6 +3657,10 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
                        strcmp(name, XATTR_LUSTRE_LOV) == 0);
                rc = lod_declare_layout_split(env, dt, buf, th);
+       } else if (fl & LU_XATTR_PURGE) {
+               LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
+                       strcmp(name, XATTR_LUSTRE_LOV) == 0);
+               rc = lod_declare_layout_purge(env, dt, buf, th);
        } else if (S_ISREG(mode) &&
                   strlen(name) >= sizeof(XATTR_LUSTRE_LOV) + 3 &&
                   allowed_lustre_lov(name)) {
@@ -4599,6 +4784,8 @@ static int lod_xattr_set(const struct lu_env *env,
                        lod_striping_free(env, lod_dt_obj(dt));
 
                        rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
+               } else if (fl & LU_XATTR_PURGE) {
+                       rc = lod_layout_purge(env, dt, buf, th);
                } else if (dt_object_remote(dt)) {
                        /* This only happens during migration, see
                         * mdd_migrate_create(), in which Master MDT will