+/**
+ * Allocate new update request
+ *
+ * Allocate new update request and insert it to the req_update_list.
+ *
+ * \param [in] our osp_udate_request where to create a new
+ * update request
+ *
+ * \retval 0 if creation succeeds.
+ * \retval negative errno if creation fails.
+ */
+int osp_object_update_request_create(struct osp_update_request *our,
+ size_t size)
+{
+ struct osp_update_request_sub *ours;
+ struct object_update_request *ourq;
+
+ OBD_ALLOC_PTR(ours);
+ if (ours == NULL)
+ return -ENOMEM;
+
+ /* The object update request will be added to an SG list for
+ * bulk transfer. Some IB HW cannot handle partial pages in SG
+ * lists (since they create gaps in memory regions) so we
+ * round the size up to the next multiple of PAGE_SIZE. See
+ * LU-9983. */
+ LASSERT(size > 0);
+ size = round_up(size, PAGE_SIZE);
+ OBD_ALLOC_LARGE(ourq, size);
+ if (ourq == NULL) {
+ OBD_FREE_PTR(ours);
+ return -ENOMEM;
+ }
+
+ ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
+ ourq->ourq_count = 0;
+ ours->ours_req = ourq;
+ ours->ours_req_size = size;
+ INIT_LIST_HEAD(&ours->ours_list);
+ list_add_tail(&ours->ours_list, &our->our_req_list);
+ our->our_req_nr++;
+
+ return 0;
+}
+
+/**
+ * Get current update request
+ *
+ * Get current object update request from our_req_list in
+ * osp_update_request, because we always insert the new update
+ * request in the last position, so the last update request
+ * in the list will be the current update req.
+ *
+ * \param[in] our osp update request where to get the
+ * current object update.
+ *
+ * \retval the current object update.
+ **/
+struct osp_update_request_sub *
+osp_current_object_update_request(struct osp_update_request *our)
+{
+ if (list_empty(&our->our_req_list))
+ return NULL;
+
+ return list_entry(our->our_req_list.prev, struct osp_update_request_sub,
+ ours_list);
+}
+
+/**
+ * Allocate and initialize osp_update_request
+ *
+ * osp_update_request is being used to track updates being executed on
+ * this dt_device(OSD or OSP). The update buffer will be 4k initially,
+ * and increased if needed.
+ *
+ * \param [in] dt dt device
+ *
+ * \retval osp_update_request being allocated if succeed
+ * \retval ERR_PTR(errno) if failed
+ */
+struct osp_update_request *osp_update_request_create(struct dt_device *dt)
+{
+ struct osp_update_request *our;
+ int rc;
+
+ OBD_ALLOC_PTR(our);
+ if (our == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ INIT_LIST_HEAD(&our->our_req_list);
+ INIT_LIST_HEAD(&our->our_cb_items);
+ INIT_LIST_HEAD(&our->our_list);
+ INIT_LIST_HEAD(&our->our_invalidate_cb_list);
+ spin_lock_init(&our->our_list_lock);
+
+ rc = osp_object_update_request_create(our, PAGE_SIZE);
+ if (rc != 0) {
+ OBD_FREE_PTR(our);
+ return ERR_PTR(rc);
+ }
+ return our;
+}
+
+void osp_update_request_destroy(const struct lu_env *env,
+ struct osp_update_request *our)
+{
+ struct osp_update_request_sub *ours;
+ struct osp_update_request_sub *tmp;
+
+ if (our == NULL)
+ return;
+
+ list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) {
+ list_del(&ours->ours_list);
+ if (ours->ours_req != NULL)
+ OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size);
+ OBD_FREE_PTR(ours);
+ }
+
+ if (!list_empty(&our->our_invalidate_cb_list)) {
+ struct lu_env lenv;
+ struct osp_object *obj;
+ struct osp_object *next;
+
+ if (env == NULL) {
+ lu_env_init(&lenv, LCT_MD_THREAD | LCT_DT_THREAD);
+ env = &lenv;
+ }
+
+ list_for_each_entry_safe(obj, next,
+ &our->our_invalidate_cb_list,
+ opo_invalidate_cb_list) {
+ spin_lock(&obj->opo_lock);
+ list_del_init(&obj->opo_invalidate_cb_list);
+ spin_unlock(&obj->opo_lock);
+
+ dt_object_put(env, &obj->opo_obj);
+ }
+
+ if (env == &lenv)
+ lu_env_fini(&lenv);
+ }
+
+ OBD_FREE_PTR(our);
+}
+
+static void
+object_update_request_dump(const struct object_update_request *ourq,
+ unsigned int mask)
+{
+ unsigned int i;
+ size_t total_size = 0;
+
+ for (i = 0; i < ourq->ourq_count; i++) {
+ struct object_update *update;
+ size_t size = 0;
+
+ update = object_update_request_get(ourq, i, &size);
+ LASSERT(update != NULL);
+ CDEBUG(mask, "i = %u fid = "DFID" op = %s "
+ "params = %d batchid = %llu size = %zu repsize %u\n",
+ i, PFID(&update->ou_fid),
+ update_op_str(update->ou_type),
+ update->ou_params_count,
+ update->ou_batchid, size,
+ (unsigned)update->ou_result_size);
+
+ total_size += size;
+ }
+
+ CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
+ ourq->ourq_magic, ourq->ourq_count, total_size);
+}
+
+/**
+ * Prepare inline update request
+ *
+ * Prepare OUT update ptlrpc inline request, and the request usually includes
+ * one update buffer, which does not need bulk transfer.
+ *
+ * \param[in] env execution environment
+ * \param[in] req ptlrpc request
+ * \param[in] ours sub osp_update_request to be packed
+ *
+ * \retval 0 if packing succeeds
+ * \retval negative errno if packing fails
+ */
+int osp_prep_inline_update_req(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ struct osp_update_request *our,
+ int repsize)
+{
+ struct osp_update_request_sub *ours;
+ struct out_update_header *ouh;
+ __u32 update_req_size;
+ int rc;
+
+ ours = list_entry(our->our_req_list.next,
+ struct osp_update_request_sub, ours_list);
+ update_req_size = object_update_request_size(ours->ours_req);
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
+ update_req_size + sizeof(*ouh));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+ if (rc != 0)
+ RETURN(rc);
+
+ ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
+ ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
+ ouh->ouh_count = 1;
+ ouh->ouh_inline_length = update_req_size;
+ ouh->ouh_reply_size = repsize;
+
+ memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+ RCL_SERVER, repsize);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+
+ RETURN(rc);
+}
+
+/**
+ * Prepare update request.
+ *
+ * Prepare OUT update ptlrpc request, and the request usually includes
+ * all of updates (stored in \param ureq) from one operation.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] ureq hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if preparation succeeds.
+ * \retval negative errno if preparation fails.
+ */
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+ struct osp_update_request *our,
+ struct ptlrpc_request **reqp)
+{
+ struct ptlrpc_request *req;
+ struct ptlrpc_bulk_desc *desc;
+ struct osp_update_request_sub *ours;
+ const struct object_update_request *ourq;
+ struct out_update_header *ouh;
+ struct out_update_buffer *oub;
+ __u32 buf_count = 0;
+ int repsize = 0;
+ struct object_update_reply *reply;
+ int rc, i;
+ int total = 0;
+ ENTRY;
+
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ object_update_request_dump(ours->ours_req, D_INFO);
+
+ ourq = ours->ours_req;
+ for (i = 0; i < ourq->ourq_count; i++) {
+ struct object_update *update;
+ size_t size = 0;
+
+
+ /* XXX: it's very inefficient to lookup update
+ * this way, iterating from the beginning
+ * each time */
+ update = object_update_request_get(ourq, i, &size);
+ LASSERT(update != NULL);
+
+ repsize += sizeof(reply->ourp_lens[0]);
+ repsize += sizeof(struct object_update_result);
+ repsize += update->ou_result_size;
+ }
+
+ buf_count++;
+ }
+ repsize += sizeof(*reply);
+ if (repsize < OUT_UPDATE_REPLY_SIZE)
+ repsize = OUT_UPDATE_REPLY_SIZE;
+ LASSERT(buf_count > 0);
+
+ req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ if (buf_count == 1) {
+ ours = list_entry(our->our_req_list.next,
+ struct osp_update_request_sub, ours_list);
+
+ /* Let's check if it can be packed inline */
+ if (object_update_request_size(ours->ours_req) +
+ sizeof(struct out_update_header) <
+ OUT_UPDATE_MAX_INLINE_SIZE) {
+ rc = osp_prep_inline_update_req(env, req, our, repsize);
+ if (rc == 0)
+ *reqp = req;
+ GOTO(out_req, rc);
+ }
+ }
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
+ sizeof(struct osp_update_request));
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
+ buf_count * sizeof(*oub));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+ if (rc != 0)
+ GOTO(out_req, rc);
+
+ ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
+ ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
+ ouh->ouh_count = buf_count;
+ ouh->ouh_inline_length = 0;
+ ouh->ouh_reply_size = repsize;
+ oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ oub->oub_size = ours->ours_req_size;
+ oub++;
+ }
+
+ req->rq_bulk_write = 1;
+ desc = ptlrpc_prep_bulk_imp(req, buf_count,
+ MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
+ PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
+ MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
+ if (desc == NULL)
+ GOTO(out_req, rc = -ENOMEM);
+
+ /* NB req now owns desc and will free it when it gets freed */
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
+ ours->ours_req_size);
+ total += ours->ours_req_size;
+ }
+ CDEBUG(D_OTHER, "total %d in %u\n", total, our->our_update_nr);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+ RCL_SERVER, repsize);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+ *reqp = req;
+
+out_req:
+ if (rc < 0)
+ ptlrpc_req_finished(req);
+
+ RETURN(rc);
+}
+
+/**
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] our hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if RPC succeeds.
+ * \retval negative errno if RPC fails.
+ */
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+ struct osp_update_request *our,
+ struct ptlrpc_request **reqp)
+{
+ struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
+ struct ptlrpc_request *req = NULL;
+ int rc;
+ ENTRY;
+
+ rc = osp_prep_update_req(env, imp, our, &req);
+ if (rc != 0)
+ RETURN(rc);
+
+ osp_set_req_replay(osp, req);
+ req->rq_allow_intr = 1;
+
+ /* Note: some dt index api might return non-zero result here, like
+ * osd_index_ea_lookup, so we should only check rc < 0 here */
+ rc = ptlrpc_queue_wait(req);
+ our->our_rc = rc;
+ if (rc < 0 || reqp == NULL)
+ ptlrpc_req_finished(req);
+ else
+ *reqp = req;
+
+ RETURN(rc);
+}
+
+/**
+ * Invalidate all objects in the osp thandle
+ *
+ * invalidate all of objects in the update request, which will be called
+ * when the transaction is aborted.
+ *
+ * \param[in] oth osp thandle.
+ */
+static void osp_thandle_invalidate_object(const struct lu_env *env,
+ struct osp_thandle *oth,
+ int result)
+{
+ struct osp_update_request *our = oth->ot_our;
+ struct osp_object *obj;
+ struct osp_object *next;
+
+ if (our == NULL)
+ return;
+
+ list_for_each_entry_safe(obj, next, &our->our_invalidate_cb_list,
+ opo_invalidate_cb_list) {
+ if (result < 0)
+ osp_invalidate(env, &obj->opo_obj);
+
+ spin_lock(&obj->opo_lock);
+ list_del_init(&obj->opo_invalidate_cb_list);
+ spin_unlock(&obj->opo_lock);
+
+ dt_object_put(env, &obj->opo_obj);
+ }
+}
+
+static void osp_trans_stop_cb(const struct lu_env *env,
+ struct osp_thandle *oth, int result)
+{
+ struct dt_txn_commit_cb *dcb;
+ struct dt_txn_commit_cb *tmp;
+
+ /* call per-transaction stop callbacks if any */
+ list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
+ dcb_linkage) {
+ LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
+ "commit callback entry: magic=%x name='%s'\n",
+ dcb->dcb_magic, dcb->dcb_name);
+ list_del_init(&dcb->dcb_linkage);
+ dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
+ }
+
+ osp_thandle_invalidate_object(env, oth, result);
+}
+
+/**
+ * Allocate an osp request and initialize it with the given parameters.
+ *
+ * \param[in] obj pointer to the operation target
+ * \param[in] data pointer to the data used by the interpreter
+ * \param[in] interpreter pointer to the interpreter function
+ *
+ * \retval pointer to the asychronous request
+ * \retval NULL if the allocation failed
+ */
+static struct osp_update_callback *
+osp_update_callback_init(struct osp_object *obj, void *data,
+ osp_update_interpreter_t interpreter)