+/**
+ * Allocate a striping using round-robin algorithm.
+ *
+ * Allocates a new striping using round-robin algorithm. The function refreshes
+ * all the internal structures (statfs cache, array of available remote MDTs
+ * sorted with regard to MDS, etc). The number of stripes required is taken from
+ * the object (must be prepared by the caller). The caller should ensure nobody
+ * else is trying to create a striping on the object in parallel. All the
+ * internal structures (like pools, etc) are protected and no additional locking
+ * is required. The function succeeds even if a single stripe is allocated.
+ *
+ * \param[in] env execution environment for this thread
+ * \param[in] lo LOD object
+ * \param[out] stripes striping created
+ *
+ * \retval positive stripe objects allocated, including the first stripe
+ * allocated outside
+ * \retval -ENOSPC if not enough MDTs are found
+ * \retval negative negated errno for other failures
+ */
+int lod_mdt_alloc_rr(const struct lu_env *env, struct lod_object *lo,
+ struct dt_object **stripes, u32 stripe_idx,
+ u32 stripe_count)
+{
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ struct lu_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lu_tgt_pool *pool;
+ struct lu_qos_rr *lqr;
+ struct lu_tgt_desc *mdt;
+ struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
+ struct lu_fid fid = { 0 };
+ struct dt_object *dto;
+ unsigned int pool_idx;
+ unsigned int i;
+ u32 saved_idx = stripe_idx;
+ u32 start_mdt;
+ u32 mdt_idx;
+ bool use_degraded = false;
+ int tgt_connecting = 0;
+ int rc;
+
+ ENTRY;
+
+ pool = <d->ltd_tgt_pool;
+ lqr = <d->ltd_qos.lq_rr;
+ rc = lod_qos_calc_rr(lod, ltd, pool, lqr);
+ if (rc)
+ RETURN(rc);
+
+ rc = lod_qos_mdt_in_use_init(env, ltd, stripe_idx, stripe_count, pool,
+ stripes);
+ if (rc)
+ RETURN(rc);
+
+ down_read(<d->ltd_qos.lq_rw_sem);
+ spin_lock(&lqr->lqr_alloc);
+ if (--lqr->lqr_start_count <= 0) {
+ lqr->lqr_start_idx = prandom_u32_max(pool->op_count);
+ lqr->lqr_start_count =
+ (LOV_CREATE_RESEED_MIN / max(pool->op_count, 1U) +
+ LOV_CREATE_RESEED_MULT) * max(pool->op_count, 1U);
+ } else if (stripe_count - 1 >= pool->op_count ||
+ lqr->lqr_start_idx > pool->op_count) {
+ /* If we have allocated from all of the tgts, slowly
+ * precess the next start if the tgt/stripe count isn't
+ * already doing this for us. */
+ lqr->lqr_start_idx %= pool->op_count;
+ if (stripe_count - 1 > 1 &&
+ (pool->op_count % (stripe_count - 1)) != 1)
+ ++lqr->lqr_offset_idx;
+ }
+ start_mdt = lqr->lqr_start_idx;
+
+repeat_find:
+ QOS_DEBUG("want=%d start_idx=%d start_count=%d offset=%d active=%d count=%d\n",
+ stripe_count - 1, lqr->lqr_start_idx, lqr->lqr_start_count,
+ lqr->lqr_offset_idx, pool->op_count, pool->op_count);
+
+ for (i = 0; i < pool->op_count && stripe_idx < stripe_count; i++) {
+ pool_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) %
+ pool->op_count;
+ ++lqr->lqr_start_idx;
+ mdt_idx = lqr->lqr_pool.op_array[pool_idx];
+ mdt = LTD_TGT(ltd, mdt_idx);
+
+ QOS_DEBUG("#%d strt %d act %d strp %d ary %d idx %d\n",
+ i, lqr->lqr_start_idx, /* XXX: active*/ 0,
+ stripe_idx, pool_idx, mdt_idx);
+
+ if (mdt_idx == LOV_QOS_EMPTY ||
+ !test_bit(mdt_idx, ltd->ltd_tgt_bitmap))
+ continue;
+
+ /* do not put >1 objects on one MDT */
+ if (lod_qos_is_tgt_used(env, mdt_idx, stripe_idx))
+ continue;
+
+ rc = lod_is_tgt_usable(ltd, mdt);
+ if (rc) {
+ if (mdt->ltd_connecting)
+ tgt_connecting = 1;
+ continue;
+ }
+
+ /* try to use another OSP if this one is degraded */
+ if (mdt->ltd_statfs.os_state & OS_STATFS_DEGRADED &&
+ !use_degraded) {
+ QOS_DEBUG("#%d: degraded\n", mdt_idx);
+ continue;
+ }
+ spin_unlock(&lqr->lqr_alloc);
+
+ rc = dt_fid_alloc(env, mdt->ltd_tgt, &fid, NULL, NULL);
+ if (rc < 0) {
+ QOS_DEBUG("#%d: alloc FID failed: %dl\n", mdt_idx, rc);
+ spin_lock(&lqr->lqr_alloc);
+ continue;
+ }
+
+ dto = dt_locate_at(env, mdt->ltd_tgt, &fid,
+ lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+ &conf);
+
+ spin_lock(&lqr->lqr_alloc);
+ if (IS_ERR(dto)) {
+ QOS_DEBUG("can't alloc stripe on #%u: %d\n",
+ mdt->ltd_index, (int) PTR_ERR(dto));
+
+ if (mdt->ltd_connecting)
+ tgt_connecting = 1;
+ continue;
+ }
+
+ lod_qos_tgt_in_use(env, stripe_idx, mdt_idx);
+ stripes[stripe_idx++] = dto;
+ }
+
+ if (!use_degraded && stripe_idx < stripe_count) {
+ /* Try again, allowing slower MDTs */
+ use_degraded = true;
+ lqr->lqr_start_idx = start_mdt;
+
+ tgt_connecting = 0;
+ goto repeat_find;
+ }
+ spin_unlock(&lqr->lqr_alloc);
+ up_read(<d->ltd_qos.lq_rw_sem);
+
+ if (stripe_idx > saved_idx)
+ /* at least one stripe is allocated */
+ RETURN(stripe_idx);
+
+ /* nobody provided us with a single object */
+ if (tgt_connecting)
+ RETURN(-EINPROGRESS);
+
+ RETURN(-ENOSPC);
+}
+
+/**
+ * Allocate a specific striping layout on a user defined set of OSTs.
+ *
+ * Allocates new striping using the OST index range provided by the data from
+ * the lmm_obejcts contained in the lov_user_md passed to this method. Full
+ * OSTs are not considered. The exact order of OSTs requested by the user
+ * is respected as much as possible depending on OST status. The number of
+ * stripes needed and stripe offset are taken from the object. If that number
+ * can not be met, then the function returns a failure and then it's the
+ * caller's responsibility to release the stripes allocated. All the internal
+ * structures are protected, but no concurrent allocation is allowed on the
+ * same objects.
+ *
+ * \param[in] env execution environment for this thread
+ * \param[in] lo LOD object
+ * \param[out] stripe striping created
+ * \param[out] ost_indices ost indices of striping created
+ * \param[in] th transaction handle
+ * \param[in] comp_idx index of ldo_comp_entries
+ *
+ * \retval 0 on success
+ * \retval -ENODEV OST index does not exist on file system
+ * \retval -EINVAL requested OST index is invalid
+ * \retval negative negated errno on error
+ */
+static int lod_alloc_ost_list(const struct lu_env *env, struct lod_object *lo,
+ struct dt_object **stripe, __u32 *ost_indices,
+ struct thandle *th, int comp_idx, __u64 reserve)
+{
+ struct lod_layout_component *lod_comp;
+ struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ struct dt_object *o;
+ unsigned int array_idx = 0;
+ int stripe_count = 0;
+ int i;
+ int rc = -EINVAL;
+ ENTRY;
+
+ /* for specific OSTs layout */
+ LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
+ lod_comp = &lo->ldo_comp_entries[comp_idx];
+ LASSERT(lod_comp->llc_ostlist.op_array);
+ LASSERT(lod_comp->llc_ostlist.op_count);
+
+ rc = lod_qos_tgt_in_use_clear(env, lod_comp->llc_stripe_count);
+ if (rc < 0)
+ RETURN(rc);
+
+ if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT)
+ lod_comp->llc_stripe_offset =
+ lod_comp->llc_ostlist.op_array[0];
+
+ for (i = 0; i < lod_comp->llc_stripe_count; i++) {
+ if (lod_comp->llc_ostlist.op_array[i] ==
+ lod_comp->llc_stripe_offset) {
+ array_idx = i;
+ break;
+ }
+ }
+ if (i == lod_comp->llc_stripe_count) {
+ CDEBUG(D_OTHER,
+ "%s: start index %d not in the specified list of OSTs\n",
+ lod2obd(m)->obd_name, lod_comp->llc_stripe_offset);
+ RETURN(-EINVAL);
+ }
+
+ for (i = 0; i < lod_comp->llc_stripe_count;
+ i++, array_idx = (array_idx + 1) % lod_comp->llc_stripe_count) {
+ __u32 ost_idx = lod_comp->llc_ostlist.op_array[array_idx];
+
+ if (!test_bit(ost_idx, m->lod_ost_bitmap)) {
+ rc = -ENODEV;
+ break;
+ }
+
+ /* do not put >1 objects on a single OST, except for
+ * overstriping