+ int stripe;
+ int rc = -EIO;
+ ENTRY;
+
+ CDEBUG(D_QOS, "Check and create on idx %d \n", ost_idx);
+ if (!lov->lov_tgts[ost_idx] ||
+ !lov->lov_tgts[ost_idx]->ltd_active)
+ RETURN(rc);
+
+ /* check if objects has been created on this ost */
+ for (stripe = 0; stripe < lsm->lsm_stripe_count; stripe++) {
+ /* already have object at this stripe */
+ if (ost_idx == lsm->lsm_oinfo[stripe]->loi_ost_idx)
+ break;
+ }
+
+ if (stripe >= lsm->lsm_stripe_count) {
+ req->rq_idx = ost_idx;
+ rc = obd_create(lov->lov_tgts[ost_idx]->ltd_exp,
+ req->rq_oi.oi_oa, &req->rq_oi.oi_md,
+ oti);
+ }
+ RETURN(rc);
+}
+
+int qos_remedy_create(struct lov_request_set *set, struct lov_request *req)
+{
+ struct lov_stripe_md *lsm = set->set_oi->oi_md;
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+ unsigned ost_idx = 0, ost_count;
+ struct pool_desc *pool;
+ struct ost_pool *osts = NULL;
+ int i, rc = -EIO;
+ ENTRY;
+
+ /* First check whether we can create the objects on the pool */
+ pool = lov_find_pool(lov, lsm->lsm_pool_name);
+ if (pool != NULL) {
+ cfs_down_read(&pool_tgt_rw_sem(pool));
+ osts = &(pool->pool_obds);
+ ost_count = osts->op_count;
+ for (i = 0; i < ost_count; i++, ost_idx = osts->op_array[i]) {
+ rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
+ set->set_oti);
+ if (rc == 0)
+ break;
+ }
+ cfs_up_read(&pool_tgt_rw_sem(pool));
+ lov_pool_putref(pool);
+ RETURN(rc);
+ }
+
+ ost_count = lov->desc.ld_tgt_count;
+ /* Then check whether we can create the objects on other OSTs */
+ ost_idx = (req->rq_idx + lsm->lsm_stripe_count) % ost_count;
+ for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
+ rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
+ set->set_oti);
+
+ if (rc == 0)
+ break;
+ }
+
+ RETURN(rc);
+}
+
+static int min_stripe_count(int stripe_cnt, int flags)
+{
+ return (flags & LOV_USES_DEFAULT_STRIPE ?
+ stripe_cnt - (stripe_cnt / 4) : stripe_cnt);
+}
+
+#define LOV_CREATE_RESEED_MULT 30
+#define LOV_CREATE_RESEED_MIN 2000
+/* Allocate objects on osts with round-robin algorithm */
+static int alloc_rr(struct lov_obd *lov, int *idx_arr, int *stripe_cnt,
+ char *poolname, int flags)
+{
+ unsigned array_idx;
+ int i, rc, *idx_pos;
+ __u32 ost_idx;
+ int ost_start_idx_temp;
+ int speed = 0;
+ int stripe_cnt_min = min_stripe_count(*stripe_cnt, flags);
+ struct pool_desc *pool;
+ struct ost_pool *osts;
+ struct lov_qos_rr *lqr;
+ ENTRY;
+
+ pool = lov_find_pool(lov, poolname);
+ if (pool == NULL) {
+ osts = &(lov->lov_packed);
+ lqr = &(lov->lov_qos.lq_rr);
+ } else {
+ cfs_down_read(&pool_tgt_rw_sem(pool));
+ osts = &(pool->pool_obds);
+ lqr = &(pool->pool_rr);
+ }
+
+ rc = qos_calc_rr(lov, osts, lqr);
+ if (rc)
+ GOTO(out, rc);
+
+ if (--lqr->lqr_start_count <= 0) {
+ lqr->lqr_start_idx = cfs_rand() % osts->op_count;
+ lqr->lqr_start_count =
+ (LOV_CREATE_RESEED_MIN / max(osts->op_count, 1U) +
+ LOV_CREATE_RESEED_MULT) * max(osts->op_count, 1U);
+ } else if (stripe_cnt_min >= osts->op_count ||
+ lqr->lqr_start_idx > osts->op_count) {
+ /* If we have allocated from all of the OSTs, slowly
+ * precess the next start if the OST/stripe count isn't
+ * already doing this for us. */
+ lqr->lqr_start_idx %= osts->op_count;
+ if (*stripe_cnt > 1 && (osts->op_count % (*stripe_cnt)) != 1)
+ ++lqr->lqr_offset_idx;
+ }
+ cfs_down_read(&lov->lov_qos.lq_rw_sem);
+ ost_start_idx_temp = lqr->lqr_start_idx;
+
+repeat_find:
+ array_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) % osts->op_count;
+ idx_pos = idx_arr;
+#ifdef QOS_DEBUG
+ CDEBUG(D_QOS, "pool '%s' want %d startidx %d startcnt %d offset %d "
+ "active %d count %d arrayidx %d\n", poolname,
+ *stripe_cnt, lqr->lqr_start_idx, lqr->lqr_start_count,
+ lqr->lqr_offset_idx, osts->op_count, osts->op_count, array_idx);
+#endif
+
+ for (i = 0; i < osts->op_count;
+ i++, array_idx=(array_idx + 1) % osts->op_count) {
+ ++lqr->lqr_start_idx;
+ ost_idx = lqr->lqr_pool.op_array[array_idx];
+#ifdef QOS_DEBUG
+ CDEBUG(D_QOS, "#%d strt %d act %d strp %d ary %d idx %d\n",
+ i, lqr->lqr_start_idx,
+ ((ost_idx != LOV_QOS_EMPTY) && lov->lov_tgts[ost_idx]) ?
+ lov->lov_tgts[ost_idx]->ltd_active : 0,
+ idx_pos - idx_arr, array_idx, ost_idx);
+#endif
+ if ((ost_idx == LOV_QOS_EMPTY) || !lov->lov_tgts[ost_idx] ||
+ !lov->lov_tgts[ost_idx]->ltd_active)
+ continue;
+
+ /* Fail Check before osc_precreate() is called
+ so we can only 'fail' single OSC. */
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
+ continue;
+
+ /* Drop slow OSCs if we can */
+ if (obd_precreate(lov->lov_tgts[ost_idx]->ltd_exp) > speed)
+ continue;
+
+ *idx_pos = ost_idx;
+ idx_pos++;
+ /* We have enough stripes */
+ if (idx_pos - idx_arr == *stripe_cnt)
+ break;
+ }
+ if ((speed < 2) && (idx_pos - idx_arr < stripe_cnt_min)) {
+ /* Try again, allowing slower OSCs */
+ speed++;
+ lqr->lqr_start_idx = ost_start_idx_temp;
+ goto repeat_find;
+ }
+
+ cfs_up_read(&lov->lov_qos.lq_rw_sem);
+
+ *stripe_cnt = idx_pos - idx_arr;
+out:
+ if (pool != NULL) {
+ cfs_up_read(&pool_tgt_rw_sem(pool));
+ /* put back ref got by lov_find_pool() */
+ lov_pool_putref(pool);
+ }
+
+ RETURN(rc);
+}
+
+/* alloc objects on osts with specific stripe offset */
+static int alloc_specific(struct lov_obd *lov, struct lov_stripe_md *lsm,
+ int *idx_arr)
+{
+ unsigned ost_idx, array_idx, ost_count;
+ int i, rc, *idx_pos;
+ int speed = 0;
+ struct pool_desc *pool;
+ struct ost_pool *osts;
+ ENTRY;
+
+ pool = lov_find_pool(lov, lsm->lsm_pool_name);
+ if (pool == NULL) {
+ osts = &(lov->lov_packed);
+ } else {
+ cfs_down_read(&pool_tgt_rw_sem(pool));
+ osts = &(pool->pool_obds);
+ }
+
+ ost_count = osts->op_count;
+
+repeat_find:
+ /* search loi_ost_idx in ost array */
+ array_idx = 0;
+ for (i = 0; i < ost_count; i++) {
+ if (osts->op_array[i] == lsm->lsm_oinfo[0]->loi_ost_idx) {
+ array_idx = i;
+ break;
+ }
+ }
+ if (i == ost_count) {
+ CERROR("Start index %d not found in pool '%s'\n",
+ lsm->lsm_oinfo[0]->loi_ost_idx, lsm->lsm_pool_name);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ idx_pos = idx_arr;
+ for (i = 0; i < ost_count;
+ i++, array_idx = (array_idx + 1) % ost_count) {
+ ost_idx = osts->op_array[array_idx];
+
+ if (!lov->lov_tgts[ost_idx] ||
+ !lov->lov_tgts[ost_idx]->ltd_active) {
+ continue;
+ }
+
+ /* Fail Check before osc_precreate() is called
+ so we can only 'fail' single OSC. */
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
+ continue;
+
+ /* Drop slow OSCs if we can, but not for requested start idx.
+ *
+ * This means "if OSC is slow and it is not the requested
+ * start OST, then it can be skipped, otherwise skip it only
+ * if it is inactive/recovering/out-of-space." */
+ if ((obd_precreate(lov->lov_tgts[ost_idx]->ltd_exp) > speed) &&
+ (i != 0 || speed >= 2))
+ continue;
+
+ *idx_pos = ost_idx;
+ idx_pos++;
+ /* We have enough stripes */
+ if (idx_pos - idx_arr == lsm->lsm_stripe_count)
+ GOTO(out, rc = 0);
+ }
+ if (speed < 2) {
+ /* Try again, allowing slower OSCs */
+ speed++;
+ goto repeat_find;
+ }
+
+ /* If we were passed specific striping params, then a failure to
+ * meet those requirements is an error, since we can't reallocate
+ * that memory (it might be part of a larger array or something).
+ *
+ * We can only get here if lsm_stripe_count was originally > 1.
+ */
+ CERROR("can't lstripe objid "LPX64": have %d want %u\n",
+ lsm->lsm_object_id, (int)(idx_pos - idx_arr),
+ lsm->lsm_stripe_count);
+ rc = -EFBIG;
+out:
+ if (pool != NULL) {
+ cfs_up_read(&pool_tgt_rw_sem(pool));
+ /* put back ref got by lov_find_pool() */
+ lov_pool_putref(pool);
+ }
+
+ RETURN(rc);
+}
+
+/* Alloc objects on osts with optimization based on:
+ - free space
+ - network resources (shared OSS's)
+*/
+static int alloc_qos(struct obd_export *exp, int *idx_arr, int *stripe_cnt,
+ char *poolname, int flags)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ __u64 total_weight = 0;
+ int nfound, good_osts, i, rc = 0;
+ int stripe_cnt_min = min_stripe_count(*stripe_cnt, flags);
+ struct pool_desc *pool;
+ struct ost_pool *osts;
+ struct lov_qos_rr *lqr;
+ ENTRY;
+
+ if (stripe_cnt_min < 1)
+ RETURN(-EINVAL);
+
+ pool = lov_find_pool(lov, poolname);
+ if (pool == NULL) {
+ osts = &(lov->lov_packed);
+ lqr = &(lov->lov_qos.lq_rr);
+ } else {
+ cfs_down_read(&pool_tgt_rw_sem(pool));
+ osts = &(pool->pool_obds);
+ lqr = &(pool->pool_rr);
+ }
+
+ obd_getref(exp->exp_obd);
+
+ /* wait for fresh statfs info if needed, the rpcs are sent in
+ * lov_create() */
+ qos_statfs_update(exp->exp_obd,
+ cfs_time_shift_64(-2 * lov->desc.ld_qos_maxage), 1);
+
+ /* Detect -EAGAIN early, before expensive lock is taken. */
+ if (!lov->lov_qos.lq_dirty && lov->lov_qos.lq_same_space)
+ GOTO(out_nolock, rc = -EAGAIN);
+
+ /* Do actual allocation, use write lock here. */
+ cfs_down_write(&lov->lov_qos.lq_rw_sem);
+
+ /*
+ * Check again, while we were sleeping on @lq_rw_sem things could
+ * change.
+ */
+ if (!lov->lov_qos.lq_dirty && lov->lov_qos.lq_same_space)
+ GOTO(out, rc = -EAGAIN);
+
+ if (lov->desc.ld_active_tgt_count < 2)
+ GOTO(out, rc = -EAGAIN);
+
+ rc = qos_calc_ppo(exp->exp_obd);
+ if (rc)
+ GOTO(out, rc);
+
+ good_osts = 0;
+ /* Find all the OSTs that are valid stripe candidates */
+ for (i = 0; i < osts->op_count; i++) {
+ if (!lov->lov_tgts[osts->op_array[i]] ||
+ !lov->lov_tgts[osts->op_array[i]]->ltd_active)
+ continue;
+
+ /* Fail Check before osc_precreate() is called
+ so we can only 'fail' single OSC. */
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && osts->op_array[i] == 0)
+ continue;
+
+ if (obd_precreate(lov->lov_tgts[osts->op_array[i]]->ltd_exp) > 2)
+ continue;
+
+ lov->lov_tgts[osts->op_array[i]]->ltd_qos.ltq_usable = 1;
+ qos_calc_weight(lov, osts->op_array[i]);
+ total_weight += lov->lov_tgts[osts->op_array[i]]->ltd_qos.ltq_weight;
+
+ good_osts++;
+ }
+
+#ifdef QOS_DEBUG
+ CDEBUG(D_QOS, "found %d good osts\n", good_osts);
+#endif
+
+ if (good_osts < stripe_cnt_min)
+ GOTO(out, rc = -EAGAIN);
+
+ /* We have enough osts */
+ if (good_osts < *stripe_cnt)
+ *stripe_cnt = good_osts;
+
+ if (!*stripe_cnt)
+ GOTO(out, rc = -EAGAIN);
+
+ /* Find enough OSTs with weighted random allocation. */
+ nfound = 0;
+ while (nfound < *stripe_cnt) {
+ __u64 rand, cur_weight;
+
+ cur_weight = 0;
+ rc = -ENODEV;
+
+ if (total_weight) {
+#if BITS_PER_LONG == 32
+ rand = cfs_rand() % (unsigned)total_weight;
+ /* If total_weight > 32-bit, first generate the high
+ * 32 bits of the random number, then add in the low
+ * 32 bits (truncated to the upper limit, if needed) */
+ if (total_weight > 0xffffffffULL)
+ rand = (__u64)(cfs_rand() %
+ (unsigned)(total_weight >> 32)) << 32;
+ else
+ rand = 0;
+
+ if (rand == (total_weight & 0xffffffff00000000ULL))
+ rand |= cfs_rand() % (unsigned)total_weight;
+ else
+ rand |= cfs_rand();
+
+#else
+ rand = ((__u64)cfs_rand() << 32 | cfs_rand()) %
+ total_weight;
+#endif
+ } else {
+ rand = 0;
+ }
+
+ /* On average, this will hit larger-weighted osts more often.
+ 0-weight osts will always get used last (only when rand=0).*/
+ for (i = 0; i < osts->op_count; i++) {
+ if (!lov->lov_tgts[osts->op_array[i]] ||
+ !lov->lov_tgts[osts->op_array[i]]->ltd_qos.ltq_usable)
+ continue;
+
+ cur_weight += lov->lov_tgts[osts->op_array[i]]->ltd_qos.ltq_weight;
+#ifdef QOS_DEBUG
+ CDEBUG(D_QOS, "stripe_cnt=%d nfound=%d cur_weight="LPU64
+ " rand="LPU64" total_weight="LPU64"\n",
+ *stripe_cnt, nfound, cur_weight, rand, total_weight);
+#endif
+ if (cur_weight >= rand) {
+#ifdef QOS_DEBUG
+ CDEBUG(D_QOS, "assigned stripe=%d to idx=%d\n",
+ nfound, osts->op_array[i]);
+#endif
+ idx_arr[nfound++] = osts->op_array[i];
+ qos_used(lov, osts, osts->op_array[i], &total_weight);
+ rc = 0;
+ break;
+ }
+ }
+ /* should never satisfy below condition */
+ if (rc) {
+ CERROR("Didn't find any OSTs?\n");
+ break;
+ }
+ }
+ LASSERT(nfound == *stripe_cnt);
+
+out:
+ cfs_up_write(&lov->lov_qos.lq_rw_sem);
+
+out_nolock:
+ if (pool != NULL) {
+ cfs_up_read(&pool_tgt_rw_sem(pool));
+ /* put back ref got by lov_find_pool() */
+ lov_pool_putref(pool);
+ }
+
+ if (rc == -EAGAIN)
+ rc = alloc_rr(lov, idx_arr, stripe_cnt, poolname, flags);
+
+ obd_putref(exp->exp_obd);
+ RETURN(rc);
+}
+
+/* return new alloced stripe count on success */
+static int alloc_idx_array(struct obd_export *exp, struct lov_stripe_md *lsm,
+ int newea, int **idx_arr, int *arr_cnt, int flags)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int stripe_cnt = lsm->lsm_stripe_count;