+static int ofd_ladvise_prefetch(const struct lu_env *env,
+ struct ofd_object *fo,
+ struct niobuf_local *lnb,
+ __u64 start, __u64 end, enum dt_bufs_type dbt)
+{
+ struct ofd_thread_info *info = ofd_info(env);
+ pgoff_t start_index, end_index, pages;
+ struct niobuf_remote rnb;
+ unsigned long nr_local;
+ int rc = 0;
+
+ if (end <= start)
+ RETURN(-EINVAL);
+
+ ofd_read_lock(env, fo);
+ if (!ofd_object_exists(fo))
+ GOTO(out_unlock, rc = -ENOENT);
+
+ rc = ofd_attr_get(env, fo, &info->fti_attr);
+ if (rc)
+ GOTO(out_unlock, rc);
+
+ if (end > info->fti_attr.la_size)
+ end = info->fti_attr.la_size;
+
+ if (end <= start)
+ GOTO(out_unlock, rc);
+
+ /* We need page aligned offset and length */
+ start_index = start >> PAGE_SHIFT;
+ end_index = (end - 1) >> PAGE_SHIFT;
+ pages = end_index - start_index + 1;
+ while (pages > 0) {
+ nr_local = pages <= PTLRPC_MAX_BRW_PAGES ? pages :
+ PTLRPC_MAX_BRW_PAGES;
+ rnb.rnb_offset = start_index << PAGE_SHIFT;
+ rnb.rnb_len = nr_local << PAGE_SHIFT;
+ rc = dt_bufs_get(env, ofd_object_child(fo), &rnb, lnb, dbt);
+ if (unlikely(rc < 0))
+ break;
+ nr_local = rc;
+ rc = dt_read_prep(env, ofd_object_child(fo), lnb, nr_local);
+ dt_bufs_put(env, ofd_object_child(fo), lnb, nr_local);
+ if (unlikely(rc))
+ break;
+ start_index += nr_local;
+ pages -= nr_local;
+ }
+
+out_unlock:
+ ofd_read_unlock(env, fo);
+ RETURN(rc);
+}
+
+/**
+ * OFD request handler for OST_LADVISE RPC.
+ *
+ * Tune cache or perfetch policies according to advices.
+ *
+ * \param[in] tsi target session environment for this request
+ *
+ * \retval 0 if successful
+ * \retval negative errno on error
+ */
+static int ofd_ladvise_hdl(struct tgt_session_info *tsi)
+{
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct obd_export *exp = tsi->tsi_exp;
+ struct ofd_device *ofd = ofd_exp(exp);
+ struct ost_body *body, *repbody;
+ struct ofd_thread_info *info;
+ struct ofd_object *fo;
+ struct ptlrpc_thread *svc_thread = req->rq_svc_thread;
+ const struct lu_env *env = svc_thread->t_env;
+ struct tgt_thread_big_cache *tbc = svc_thread->t_data;
+ enum dt_bufs_type dbt = DT_BUFS_TYPE_READAHEAD;
+ struct lu_ladvise *ladvise;
+ int num_advise;
+ struct ladvise_hdr *ladvise_hdr;
+ struct obd_ioobj ioo;
+ struct lustre_handle lockh = { 0 };
+ __u64 flags = 0;
+ int i;
+ struct dt_object *dob;
+ __u64 start;
+ __u64 end;
+ int rc = 0;
+ ENTRY;
+
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OST_LADVISE_PAUSE, cfs_fail_val);
+ body = tsi->tsi_ost_body;
+
+ if ((body->oa.o_valid & OBD_MD_FLID) != OBD_MD_FLID)
+ RETURN(err_serious(-EPROTO));
+
+ ladvise_hdr = req_capsule_client_get(tsi->tsi_pill,
+ &RMF_OST_LADVISE_HDR);
+ if (ladvise_hdr == NULL)
+ RETURN(err_serious(-EPROTO));
+
+ if (ladvise_hdr->lah_magic != LADVISE_MAGIC ||
+ ladvise_hdr->lah_count < 1)
+ RETURN(err_serious(-EPROTO));
+
+ if ((ladvise_hdr->lah_flags & (~LF_MASK)) != 0)
+ RETURN(err_serious(-EPROTO));
+
+ ladvise = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_LADVISE);
+ if (ladvise == NULL)
+ RETURN(err_serious(-EPROTO));
+
+ num_advise = req_capsule_get_size(&req->rq_pill,
+ &RMF_OST_LADVISE, RCL_CLIENT) /
+ sizeof(*ladvise);
+ if (num_advise < ladvise_hdr->lah_count)
+ RETURN(err_serious(-EPROTO));
+
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ repbody->oa = body->oa;
+
+ info = ofd_info_init(env, exp);
+
+ rc = ostid_to_fid(&info->fti_fid, &body->oa.o_oi,
+ ofd->ofd_lut.lut_lsd.lsd_osd_index);
+ if (rc != 0)
+ RETURN(rc);
+
+ fo = ofd_object_find(env, ofd, &info->fti_fid);
+ if (IS_ERR(fo)) {
+ rc = PTR_ERR(fo);
+ RETURN(rc);
+ }
+ LASSERT(fo != NULL);
+ dob = ofd_object_child(fo);
+
+ if (ptlrpc_connection_is_local(exp->exp_connection))
+ dbt |= DT_BUFS_TYPE_LOCAL;
+
+ for (i = 0; i < num_advise; i++, ladvise++) {
+ start = ladvise->lla_start;
+ end = ladvise->lla_end;
+ if (end <= start) {
+ rc = err_serious(-EPROTO);
+ break;
+ }
+
+ /* Handle different advice types */
+ switch (ladvise->lla_advice) {
+ default:
+ rc = -ENOTSUPP;
+ break;
+ case LU_LADVISE_WILLREAD:
+ if (tbc == NULL)
+ RETURN(-ENOMEM);
+
+ ioo.ioo_oid = body->oa.o_oi;
+ ioo.ioo_bufcnt = 1;
+ rc = tgt_extent_lock(exp->exp_obd->obd_namespace,
+ &tsi->tsi_resid, start, end - 1,
+ &lockh, LCK_PR, &flags);
+ if (rc != 0)
+ break;
+
+ req->rq_status = ofd_ladvise_prefetch(env, fo,
+ tbc->local,
+ start, end, dbt);
+ tgt_extent_unlock(&lockh, LCK_PR);
+ break;
+ case LU_LADVISE_DONTNEED:
+ rc = dt_ladvise(env, dob, ladvise->lla_start,
+ ladvise->lla_end, LU_LADVISE_DONTNEED);
+ break;
+ }
+ if (rc != 0)
+ break;
+ }
+
+ ofd_object_put(env, fo);
+ req->rq_status = rc;
+ RETURN(rc);
+}
+
+/**
+ * OFD request handler for OST_QUOTACTL RPC.
+ *
+ * This is part of request processing to validate incoming request fields,
+ * get the requested data from OSD and pack reply.
+ *
+ * \param[in] tsi target session environment for this request
+ *
+ * \retval 0 if successful
+ * \retval negative value on error
+ */