Whamcloud - gitweb
LU-10499 pcc: use foreign layout for PCCRO on server side
[fs/lustre-release.git] / lustre / lod / lod_object.c
index b508c25..95f6984 100644 (file)
@@ -1183,6 +1183,9 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
        for (i = 0; i < lo->ldo_comp_cnt; i++) {
                lod_comp = &lo->ldo_comp_entries[i];
 
+               if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN)
+                       continue;
+
                if (lod_comp->llc_stripe == NULL)
                        continue;
 
@@ -2698,13 +2701,19 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
        for (i = 0; i < comp_cnt; i++) {
                __u16 stripe_count;
 
-               magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
-               stripe_count = lod_comp_entry_stripe_count(lo, i, is_dir);
-               if (!is_dir && is_composite)
-                       lod_comp_shrink_stripe_count(&comp_entries[i],
-                                                    &stripe_count);
-
-               size += lov_user_md_size(stripe_count, magic);
+               if (comp_entries[i].llc_magic == LOV_MAGIC_FOREIGN) {
+                       size += lov_foreign_md_size(comp_entries[i].llc_length);
+               } else {
+                       magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 :
+                                                          LOV_MAGIC_V1;
+                       stripe_count = lod_comp_entry_stripe_count(lo, i,
+                                                                  is_dir);
+                       if (!is_dir && is_composite)
+                               lod_comp_shrink_stripe_count(&comp_entries[i],
+                                                            &stripe_count);
+
+                       size += lov_user_md_size(stripe_count, magic);
+               }
                LASSERT(size % sizeof(__u64) == 0);
        }
        return size;
@@ -3298,8 +3307,9 @@ out:
  * Merge layouts to form a mirrored file.
  */
 static int lod_declare_layout_merge(const struct lu_env *env,
-               struct dt_object *dt, const struct lu_buf *mbuf,
-               struct thandle *th)
+                                   struct dt_object *dt,
+                                   const struct lu_buf *mbuf,
+                                   struct thandle *th)
 {
        struct lod_thread_info *info = lod_env_info(env);
        struct lu_attr *layout_attr = &info->lti_layout_attr;
@@ -3464,7 +3474,7 @@ static int lod_declare_layout_merge(const struct lu_env *env,
        }
 
        rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), buf,
-                                       XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
+                                      XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
 
 out:
        lu_buf_free(buf);
@@ -4834,6 +4844,9 @@ static int lod_layout_del_prep_layout(const struct lu_env *env,
                        continue;
                }
 
+               if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN)
+                       continue;
+
                lod_obj_set_pool(lo, i, NULL);
                if (lod_comp->llc_ostlist.op_array) {
                        OBD_FREE(lod_comp->llc_ostlist.op_array,
@@ -6363,6 +6376,11 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt,
                if (lod_comp_inited(lod_comp))
                        continue;
 
+               if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) {
+                       lod_comp_set_init(lod_comp);
+                       continue;
+               }
+
                if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
                        lod_comp_set_init(lod_comp);
 
@@ -7438,6 +7456,9 @@ out:
 /* If striping is already instantiated or INIT'ed DOM? */
 static bool lod_is_instantiation_needed(struct lod_layout_component *comp)
 {
+       if (comp->llc_magic == LOV_MAGIC_FOREIGN)
+               return false;
+
        return !(((lov_pattern(comp->llc_pattern) & LOV_PATTERN_MDT) &&
                  lod_comp_inited(comp)) || comp->llc_stripe);
 }
@@ -7652,6 +7673,8 @@ restart:
 
                                lod_comp->llc_flags |= LCME_FL_STALE;
                                lo->ldo_mirrors[i].lme_stale = 1;
+                               if (lod_is_hsm(lod_comp))
+                                       lod_comp->llc_foreign_flags |= HS_DIRTY;
                        }
                }
        }
@@ -7876,6 +7899,254 @@ static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo,
        return need_sync ? 0 : -EALREADY;
 }
 
+static struct lod_layout_component *
+lod_locate_comp_hsm(struct lod_object *lo, int *hsm_mirror_id)
+{
+       struct lod_layout_component *lod_comp = NULL;
+       int i;
+
+       if (!lo->ldo_is_composite)
+               return NULL;
+
+       for (i = 0; i < lo->ldo_mirror_count; i++) {
+               /*
+                * FIXME: In the current design, there is only one HSM
+                * mirror component in range [0, EOF] for a FLR file. This
+                * should be fixed to support multiple HSM mirror components
+                * with different HSM backend types and partial file ranges
+                * in the future.
+                */
+               if (lo->ldo_mirrors[i].lme_hsm) {
+                       __u16 start_idx;
+                       __u16 end_idx;
+
+                       if (hsm_mirror_id)
+                               *hsm_mirror_id = i;
+                       start_idx = lo->ldo_mirrors[i].lme_start;
+                       end_idx = lo->ldo_mirrors[i].lme_end;
+                       LASSERT(start_idx == end_idx);
+                       lod_comp = &lo->ldo_comp_entries[start_idx];
+                       LASSERT(lo->ldo_is_composite && lod_is_hsm(lod_comp) &&
+                               lod_comp->llc_extent.e_start == 0 &&
+                               lod_comp->llc_extent.e_end == LUSTRE_EOF);
+                       break;
+               }
+       }
+
+       return lod_comp;
+}
+
+static int lod_declare_pccro_set(const struct lu_env *env,
+                                struct dt_object *dt, struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lu_buf *buf = &info->lti_buf;
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_layout_component *lod_comp;
+       struct lod_layout_component *comp_array;
+       struct lod_mirror_entry *mirror_array;
+       __u16 mirror_id;
+       int hsm_mirror_id;
+       int mirror_cnt;
+       int new_cnt;
+       int rc;
+       int i;
+
+       ENTRY;
+
+       rc = lod_striping_load(env, lo);
+       if (rc)
+               RETURN(rc);
+
+       if (lo->ldo_flr_state & LCM_FL_PCC_RDONLY)
+               RETURN(-EALREADY);
+
+       rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+       if (rc)
+               RETURN(rc);
+
+       lod_comp = lod_locate_comp_hsm(lo, &hsm_mirror_id);
+       if (lod_comp) {
+               if (lod_comp->llc_foreign_flags & HS_PCCRO) {
+                       CDEBUG(D_LAYOUT, "bad HSM flags: %#x\n",
+                              lod_comp->llc_foreign_flags);
+                       RETURN(-EINVAL);
+               }
+
+               lod_obj_inc_layout_gen(lo);
+               lod_comp->llc_foreign_flags |= HS_PCCRO;
+               lod_comp->llc_foreign_flags &= ~HS_DIRTY;
+               lod_comp->llc_flags &= ~LCME_FL_STALE;
+               lo->ldo_mirrors[hsm_mirror_id].lme_stale = 0;
+               lo->ldo_flr_state |= LCM_FL_PCC_RDONLY;
+               buf->lb_len = lod_comp_md_size(lo, false);
+               rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+                                              buf, XATTR_NAME_LOV, 0, th);
+               RETURN(rc);
+       }
+
+       /*
+        * Create an new composite layout with only one HSM component.
+        * Field @lhm_archive_uuid is used to be the identifier within HSM
+        * backend for the archive copy. In the PCC case with a POSIX archive,
+        * This can just be the original inode FID. This is important because
+        * the inode FID may change due to layout swaps or migration to a new
+        * MDT, and we do not want that to cause problems with finding the copy
+        * in HSM/PCC.
+        */
+       mirror_cnt = lo->ldo_mirror_count + 1;
+       if (!lo->ldo_is_composite) {
+               LASSERT(lo->ldo_mirror_count == 0);
+               mirror_cnt++;
+       }
+
+       OBD_ALLOC_PTR_ARRAY(mirror_array, mirror_cnt);
+       if (mirror_array == NULL)
+               RETURN(-ENOMEM);
+
+       new_cnt = lo->ldo_comp_cnt + 1;
+       OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt);
+       if (comp_array == NULL) {
+               OBD_FREE_PTR_ARRAY(mirror_array, mirror_cnt);
+               RETURN(-ENOMEM);
+       }
+
+       mirror_id = 0;
+       for (i = 0; i < lo->ldo_comp_cnt; i++) {
+               lod_comp = &lo->ldo_comp_entries[i];
+
+               /*
+                * Add mirror from a non-flr file, create new mirror ID.
+                * Otherwise, keep existing mirror's component ID, used
+                * for mirror extension.
+                */
+               if (lo->ldo_mirror_count == 0 &&
+                   mirror_id_of(lod_comp->llc_id) == 0)
+                       lod_comp->llc_id = pflr_id(1, i + 1);
+
+               if (lod_comp->llc_id != LCME_ID_INVAL &&
+                   mirror_id_of(lod_comp->llc_id) > mirror_id)
+                       mirror_id = mirror_id_of(lod_comp->llc_id);
+
+               if (!lo->ldo_is_composite) {
+                       lod_comp->llc_extent.e_start = 0;
+                       lod_comp->llc_extent.e_end = LUSTRE_EOF;
+                       lod_comp_set_init(lod_comp);
+               }
+       }
+
+       memcpy(comp_array, lo->ldo_comp_entries,
+              sizeof(*comp_array) * lo->ldo_comp_cnt);
+
+       lod_comp = &comp_array[new_cnt - 1];
+       lod_comp->llc_magic = LOV_MAGIC_FOREIGN;
+       lod_comp->llc_extent.e_start = 0;
+       lod_comp->llc_extent.e_end = LUSTRE_EOF;
+       lod_comp->llc_length = sizeof(struct lov_hsm_base);
+       lod_comp->llc_type = LU_FOREIGN_TYPE_PCCRO;
+       lod_comp->llc_foreign_flags = HS_EXISTS | HS_ARCHIVED | HS_PCCRO;
+       memset(&lod_comp->llc_hsm, 0, sizeof(lod_comp->llc_hsm));
+
+       if (lo->ldo_mirrors)
+               OBD_FREE_PTR_ARRAY(lo->ldo_mirrors, lo->ldo_mirror_count);
+       OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt);
+
+       /*
+        * The @ldo_mirror will be refilled by lod_fill_mirrors() when
+        * call lod_striped_create() for layout change.
+        */
+       lo->ldo_mirrors = mirror_array;
+       lo->ldo_mirror_count = mirror_cnt;
+       lo->ldo_comp_entries = comp_array;
+       lo->ldo_comp_cnt = new_cnt;
+       lo->ldo_is_composite = 1;
+
+       ++mirror_id;
+       lod_comp->llc_id = LCME_ID_INVAL;
+       lod_comp->llc_id = lod_gen_component_id(lo, mirror_id, new_cnt - 1);
+
+       if (lo->ldo_flr_state == LCM_FL_NONE)
+               lo->ldo_flr_state = LCM_FL_RDONLY;
+       lo->ldo_flr_state |= LCM_FL_PCC_RDONLY;
+       buf->lb_len = lod_comp_md_size(lo, false);
+       rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+                                      buf, XATTR_NAME_LOV, 0, th);
+       if (rc)
+               lod_striping_free(env, lo);
+
+       RETURN(rc);
+}
+
+/*
+ * TODO: When clear LCM_FL_PCC_RDONLY flag from the layouts, it means the file
+ * is going to be modified. Currently it needs two RPCs: first one is to clear
+ * LCM_FL_PCC_RDONLY flag; the second one is to pick primary mirror and mark
+ * the file as LCM_FL_WRITE_PENDING.
+ * These two RPCs can be combined in one RPC call.
+ */
+static int lod_declare_pccro_clear(const struct lu_env *env,
+                                  struct dt_object *dt, struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_layout_component *lod_comp;
+       int rc;
+
+       ENTRY;
+
+       rc = lod_striping_load(env, lo);
+       if (rc)
+               RETURN(rc);
+
+       if (!(lo->ldo_flr_state & LCM_FL_PCC_RDONLY))
+               RETURN(-EALREADY);
+
+       rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+       if (rc)
+               RETURN(rc);
+
+       lod_comp = lod_locate_comp_hsm(lo, NULL);
+       if (lod_comp == NULL) {
+               CDEBUG(D_LAYOUT, "Not found any HSM component\n");
+               GOTO(out, rc = -EINVAL);
+       }
+
+       lod_comp->llc_foreign_flags &= ~HS_PCCRO;
+       lo->ldo_flr_state &= ~LCM_FL_PCC_RDONLY;
+       lod_obj_inc_layout_gen(lo);
+       info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+       rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+                                      &info->lti_buf, XATTR_NAME_LOV, 0, th);
+out:
+       if (rc)
+               lod_striping_free(env, lo);
+
+       RETURN(rc);
+}
+
+static int lod_declare_update_pccro(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   struct md_layout_change *mlc,
+                                   struct thandle *th)
+{
+       struct layout_intent *intent = mlc->mlc_intent;
+       int rc;
+
+       switch (intent->li_opc) {
+       case LAYOUT_INTENT_PCCRO_SET:
+               rc = lod_declare_pccro_set(env, dt, th);
+               break;
+       case LAYOUT_INTENT_PCCRO_CLEAR:
+               rc = lod_declare_pccro_clear(env, dt, th);
+               break;
+       default:
+               rc = -EOPNOTSUPP;
+               break;
+       }
+
+       return rc;
+}
+
 static int lod_declare_update_rdonly(const struct lu_env *env,
                struct lod_object *lo, struct md_layout_change *mlc,
                struct thandle *th)
@@ -8034,6 +8305,8 @@ static int lod_declare_update_write_pending(const struct lu_env *env,
                        continue;
                if (lo->ldo_mirrors[i].lme_prefer == 0)
                        continue;
+               if (lo->ldo_mirrors[i].lme_hsm)
+                       continue;
 
                primary = i;
                break;
@@ -8888,6 +9161,19 @@ static int lod_declare_layout_change(const struct lu_env *env,
            dt_object_remote(dt_object_child(dt)))
                RETURN(-EINVAL);
 
+       if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+               struct layout_intent *intent = mlc->mlc_intent;
+
+               if (intent->li_opc == LAYOUT_INTENT_PCCRO_SET ||
+                   intent->li_opc == LAYOUT_INTENT_PCCRO_CLEAR) {
+                       if (!S_ISREG(dt->do_lu.lo_header->loh_attr))
+                               RETURN(-EINVAL);
+
+                       rc = lod_declare_update_pccro(env, dt, mlc, th);
+                       RETURN(rc);
+               }
+       }
+
        rc = lod_striping_load(env, lo);
        if (rc)
                GOTO(out, rc);
@@ -9170,6 +9456,9 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo)
                        /* free lod_layout_component::llc_stripe array */
                        lod_comp = &lo->ldo_comp_entries[i];
 
+                       /* HSM layout component */
+                       if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN)
+                               continue;
                        if (lod_comp->llc_stripe == NULL)
                                continue;
                        LASSERT(lod_comp->llc_stripes_allocated != 0);