+static struct lod_layout_component *
+lod_locate_comp_hsm(struct lod_object *lo, int *hsm_mirror_id)
+{
+ struct lod_layout_component *lod_comp = NULL;
+ int i;
+
+ if (!lo->ldo_is_composite)
+ return NULL;
+
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ /*
+ * FIXME: In the current design, there is only one HSM
+ * mirror component in range [0, EOF] for a FLR file. This
+ * should be fixed to support multiple HSM mirror components
+ * with different HSM backend types and partial file ranges
+ * in the future.
+ */
+ if (lo->ldo_mirrors[i].lme_hsm) {
+ __u16 start_idx;
+ __u16 end_idx;
+
+ if (hsm_mirror_id)
+ *hsm_mirror_id = i;
+ start_idx = lo->ldo_mirrors[i].lme_start;
+ end_idx = lo->ldo_mirrors[i].lme_end;
+ LASSERT(start_idx == end_idx);
+ lod_comp = &lo->ldo_comp_entries[start_idx];
+ LASSERT(lo->ldo_is_composite && lod_is_hsm(lod_comp) &&
+ lod_comp->llc_extent.e_start == 0 &&
+ lod_comp->llc_extent.e_end == LUSTRE_EOF);
+ break;
+ }
+ }
+
+ return lod_comp;
+}
+
+static int lod_declare_pccro_set(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_buf *buf = &info->lti_buf;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_layout_component *lod_comp;
+ struct lod_layout_component *comp_array;
+ struct lod_mirror_entry *mirror_array;
+ __u16 mirror_id;
+ int hsm_mirror_id;
+ int mirror_cnt;
+ int new_cnt;
+ int rc;
+ int i;
+
+ ENTRY;
+
+ rc = lod_striping_load(env, lo);
+ if (rc)
+ RETURN(rc);
+
+ if (lo->ldo_flr_state & LCM_FL_PCC_RDONLY)
+ RETURN(-EALREADY);
+
+ rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+ if (rc)
+ RETURN(rc);
+
+ lod_comp = lod_locate_comp_hsm(lo, &hsm_mirror_id);
+ if (lod_comp) {
+ if (lod_comp->llc_foreign_flags & HS_PCCRO) {
+ CDEBUG(D_LAYOUT, "bad HSM flags: %#x\n",
+ lod_comp->llc_foreign_flags);
+ RETURN(-EINVAL);
+ }
+
+ lod_obj_inc_layout_gen(lo);
+ lod_comp->llc_foreign_flags |= HS_PCCRO;
+ lod_comp->llc_foreign_flags &= ~HS_DIRTY;
+ lod_comp->llc_flags &= ~LCME_FL_STALE;
+ lo->ldo_mirrors[hsm_mirror_id].lme_stale = 0;
+ lo->ldo_flr_state |= LCM_FL_PCC_RDONLY;
+ buf->lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+ buf, XATTR_NAME_LOV, 0, th);
+ RETURN(rc);
+ }
+
+ /*
+ * Create an new composite layout with only one HSM component.
+ * Field @lhm_archive_uuid is used to be the identifier within HSM
+ * backend for the archive copy. In the PCC case with a POSIX archive,
+ * This can just be the original inode FID. This is important because
+ * the inode FID may change due to layout swaps or migration to a new
+ * MDT, and we do not want that to cause problems with finding the copy
+ * in HSM/PCC.
+ */
+ mirror_cnt = lo->ldo_mirror_count + 1;
+ if (!lo->ldo_is_composite) {
+ LASSERT(lo->ldo_mirror_count == 0);
+ mirror_cnt++;
+ }
+
+ OBD_ALLOC_PTR_ARRAY(mirror_array, mirror_cnt);
+ if (mirror_array == NULL)
+ RETURN(-ENOMEM);
+
+ new_cnt = lo->ldo_comp_cnt + 1;
+ OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt);
+ if (comp_array == NULL) {
+ OBD_FREE_PTR_ARRAY(mirror_array, mirror_cnt);
+ RETURN(-ENOMEM);
+ }
+
+ mirror_id = 0;
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ lod_comp = &lo->ldo_comp_entries[i];
+
+ /*
+ * Add mirror from a non-flr file, create new mirror ID.
+ * Otherwise, keep existing mirror's component ID, used
+ * for mirror extension.
+ */
+ if (lo->ldo_mirror_count == 0 &&
+ mirror_id_of(lod_comp->llc_id) == 0)
+ lod_comp->llc_id = pflr_id(1, i + 1);
+
+ if (lod_comp->llc_id != LCME_ID_INVAL &&
+ mirror_id_of(lod_comp->llc_id) > mirror_id)
+ mirror_id = mirror_id_of(lod_comp->llc_id);
+
+ if (!lo->ldo_is_composite) {
+ lod_comp->llc_extent.e_start = 0;
+ lod_comp->llc_extent.e_end = LUSTRE_EOF;
+ lod_comp_set_init(lod_comp);
+ }
+ }
+
+ memcpy(comp_array, lo->ldo_comp_entries,
+ sizeof(*comp_array) * lo->ldo_comp_cnt);
+
+ lod_comp = &comp_array[new_cnt - 1];
+ lod_comp->llc_magic = LOV_MAGIC_FOREIGN;
+ lod_comp->llc_extent.e_start = 0;
+ lod_comp->llc_extent.e_end = LUSTRE_EOF;
+ lod_comp->llc_length = sizeof(struct lov_hsm_base);
+ lod_comp->llc_type = LU_FOREIGN_TYPE_PCCRO;
+ lod_comp->llc_foreign_flags = HS_EXISTS | HS_ARCHIVED | HS_PCCRO;
+ memset(&lod_comp->llc_hsm, 0, sizeof(lod_comp->llc_hsm));
+
+ if (lo->ldo_mirrors)
+ OBD_FREE_PTR_ARRAY(lo->ldo_mirrors, lo->ldo_mirror_count);
+ OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt);
+
+ /*
+ * The @ldo_mirror will be refilled by lod_fill_mirrors() when
+ * call lod_striped_create() for layout change.
+ */
+ lo->ldo_mirrors = mirror_array;
+ lo->ldo_mirror_count = mirror_cnt;
+ lo->ldo_comp_entries = comp_array;
+ lo->ldo_comp_cnt = new_cnt;
+ lo->ldo_is_composite = 1;
+
+ ++mirror_id;
+ lod_comp->llc_id = LCME_ID_INVAL;
+ lod_comp->llc_id = lod_gen_component_id(lo, mirror_id, new_cnt - 1);
+
+ if (lo->ldo_flr_state == LCM_FL_NONE)
+ lo->ldo_flr_state = LCM_FL_RDONLY;
+ lo->ldo_flr_state |= LCM_FL_PCC_RDONLY;
+ buf->lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+ buf, XATTR_NAME_LOV, 0, th);
+ if (rc)
+ lod_striping_free(env, lo);
+
+ RETURN(rc);
+}
+
+/*
+ * TODO: When clear LCM_FL_PCC_RDONLY flag from the layouts, it means the file
+ * is going to be modified. Currently it needs two RPCs: first one is to clear
+ * LCM_FL_PCC_RDONLY flag; the second one is to pick primary mirror and mark
+ * the file as LCM_FL_WRITE_PENDING.
+ * These two RPCs can be combined in one RPC call.
+ */
+static int lod_declare_pccro_clear(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_layout_component *lod_comp;
+ int rc;
+
+ ENTRY;
+
+ rc = lod_striping_load(env, lo);
+ if (rc)
+ RETURN(rc);
+
+ if (!(lo->ldo_flr_state & LCM_FL_PCC_RDONLY))
+ RETURN(-EALREADY);
+
+ rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+ if (rc)
+ RETURN(rc);
+
+ lod_comp = lod_locate_comp_hsm(lo, NULL);
+ if (lod_comp == NULL) {
+ CDEBUG(D_LAYOUT, "Not found any HSM component\n");
+ GOTO(out, rc = -EINVAL);
+ }
+
+ lod_comp->llc_foreign_flags &= ~HS_PCCRO;
+ lo->ldo_flr_state &= ~LCM_FL_PCC_RDONLY;
+ lod_obj_inc_layout_gen(lo);
+ info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+ &info->lti_buf, XATTR_NAME_LOV, 0, th);
+out:
+ if (rc)
+ lod_striping_free(env, lo);
+
+ RETURN(rc);
+}
+
+static int lod_declare_update_pccro(const struct lu_env *env,
+ struct dt_object *dt,
+ struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct layout_intent *intent = mlc->mlc_intent;
+ int rc;
+
+ switch (intent->lai_opc) {
+ case LAYOUT_INTENT_PCCRO_SET:
+ rc = lod_declare_pccro_set(env, dt, th);
+ break;
+ case LAYOUT_INTENT_PCCRO_CLEAR:
+ rc = lod_declare_pccro_clear(env, dt, th);
+ break;
+ default:
+ rc = -EOPNOTSUPP;
+ break;
+ }
+
+ return rc;
+}
+