* GPL HEADER END
*/
/*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2014, 2015, Intel Corporation.
*/
/*
* lustre/osp/osp_trans.c
ours->ours_req_size = size;
INIT_LIST_HEAD(&ours->ours_list);
list_add_tail(&ours->ours_list, &our->our_req_list);
+ our->our_req_nr++;
return 0;
}
update = object_update_request_get(ourq, i, &size);
LASSERT(update != NULL);
- CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u"
- "params = %d batchid = "LPU64" size = %zu\n",
+ CDEBUG(mask, "i = %u fid = "DFID" op = %s "
+ "params = %d batchid = "LPU64" size = %zu repsize %u\n",
i, PFID(&update->ou_fid),
update_op_str(update->ou_type),
- update->ou_master_index, update->ou_params_count,
- update->ou_batchid, size);
+ update->ou_params_count,
+ update->ou_batchid, size,
+ (unsigned)update->ou_result_size);
total_size += size;
}
}
/**
+ * Prepare inline update request
+ *
+ * Prepare OUT update ptlrpc inline request, and the request usually includes
+ * one update buffer, which does not need bulk transfer.
+ *
+ * \param[in] env execution environment
+ * \param[in] req ptlrpc request
+ * \param[in] ours sub osp_update_request to be packed
+ *
+ * \retval 0 if packing succeeds
+ * \retval negative errno if packing fails
+ */
+int osp_prep_inline_update_req(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ struct osp_update_request *our,
+ int repsize)
+{
+ struct osp_update_request_sub *ours;
+ struct out_update_header *ouh;
+ __u32 update_req_size;
+ int rc;
+
+ ours = list_entry(our->our_req_list.next,
+ struct osp_update_request_sub, ours_list);
+ update_req_size = object_update_request_size(ours->ours_req);
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
+ update_req_size + sizeof(*ouh));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+ if (rc != 0)
+ RETURN(rc);
+
+ ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
+ ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
+ ouh->ouh_count = 1;
+ ouh->ouh_inline_length = update_req_size;
+ ouh->ouh_reply_size = repsize;
+
+ memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+ RCL_SERVER, repsize);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+
+ RETURN(rc);
+}
+
+/**
* Prepare update request.
*
* Prepare OUT update ptlrpc request, and the request usually includes
struct ptlrpc_request *req;
struct ptlrpc_bulk_desc *desc;
struct osp_update_request_sub *ours;
+ const struct object_update_request *ourq;
struct out_update_header *ouh;
struct out_update_buffer *oub;
__u32 buf_count = 0;
- int rc;
+ int repsize = 0;
+ struct object_update_reply *reply;
+ int rc, i;
+ int total = 0;
ENTRY;
list_for_each_entry(ours, &our->our_req_list, ours_list) {
object_update_request_dump(ours->ours_req, D_INFO);
+
+ ourq = ours->ours_req;
+ for (i = 0; i < ourq->ourq_count; i++) {
+ struct object_update *update;
+ size_t size = 0;
+
+
+ /* XXX: it's very inefficient to lookup update
+ * this way, iterating from the beginning
+ * each time */
+ update = object_update_request_get(ourq, i, &size);
+ LASSERT(update != NULL);
+
+ repsize += sizeof(reply->ourp_lens[0]);
+ repsize += sizeof(struct object_update_result);
+ repsize += update->ou_result_size;
+ }
+
buf_count++;
}
+ repsize += sizeof(*reply);
+ repsize = (repsize + OUT_UPDATE_REPLY_SIZE - 1) &
+ ~(OUT_UPDATE_REPLY_SIZE - 1);
+ LASSERT(buf_count > 0);
req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
if (req == NULL)
RETURN(-ENOMEM);
+ if (buf_count == 1) {
+ ours = list_entry(our->our_req_list.next,
+ struct osp_update_request_sub, ours_list);
+
+ /* Let's check if it can be packed inline */
+ if (object_update_request_size(ours->ours_req) +
+ sizeof(struct out_update_header) <
+ OUT_UPDATE_MAX_INLINE_SIZE) {
+ rc = osp_prep_inline_update_req(env, req, our, repsize);
+ if (rc == 0)
+ *reqp = req;
+ GOTO(out_req, rc);
+ }
+ }
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
+ sizeof(struct osp_update_request));
+
req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
buf_count * sizeof(*oub));
ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
ouh->ouh_count = buf_count;
-
+ ouh->ouh_inline_length = 0;
+ ouh->ouh_reply_size = repsize;
oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
list_for_each_entry(ours, &our->our_req_list, ours_list) {
oub->oub_size = ours->ours_req_size;
GOTO(out_req, rc = -ENOMEM);
/* NB req now owns desc and will free it when it gets freed */
- list_for_each_entry(ours, &our->our_req_list, ours_list)
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
ours->ours_req_size);
+ total += ours->ours_req_size;
+ }
+ CDEBUG(D_OTHER, "total %d in %u\n", total, our->our_update_nr);
req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
- RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+ RCL_SERVER, repsize);
ptlrpc_request_set_replen(req);
req->rq_request_portal = OUT_PORTAL;
* \retval 0 if RPC succeeds.
* \retval negative errno if RPC fails.
*/
-
int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
struct osp_update_request *our,
struct ptlrpc_request **reqp)
* \param[in] lens buffer length array for the subsequent \a bufs
* \param[in] bufs the buffers to compose the request
* \param[in] data pointer to the data used by the interpreter
+ * \param[in] repsize how many bytes the caller allocated for \a data
* \param[in] interpreter pointer to the interpreter function
*
* \retval 0 for success
*/
int osp_insert_async_request(const struct lu_env *env, enum update_type op,
struct osp_object *obj, int count,
- __u16 *lens, const void **bufs, void *data,
+ __u16 *lens, const void **bufs,
+ void *data, __u32 repsize,
osp_update_interpreter_t interpreter)
{
struct osp_device *osp;
object_update = update_buffer_get_update(ureq, ureq->ourq_count);
rc = out_update_pack(env, object_update, &max_update_size, op,
- lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
+ lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
+ repsize);
/* The queue is full. */
if (rc == -E2BIG) {
osp->opd_async_requests = NULL;
RETURN(rc);
ureq->ourq_count++;
+ our->our_update_nr++;
}
rc = osp_insert_update_callback(env, our, obj, data, interpreter);
oth->ot_our = our;
our->our_th = oth;
- if (oth->ot_super.th_sync)
- oth->ot_our->our_flags |= UPDATE_FL_SYNC;
-
return 0;
}
if (top_device->ld_obd->obd_recovering)
req->rq_allow_replay = 1;
- osp_get_rpc_lock(osp);
+ if (osp->opd_connect_mdt)
+ osp_get_rpc_lock(osp);
rc = ptlrpc_queue_wait(req);
- osp_put_rpc_lock(osp);
+ if (osp->opd_connect_mdt)
+ osp_put_rpc_lock(osp);
if ((rc == -ENOMEM && req->rq_set == NULL) ||
(req->rq_transno == 0 && !req->rq_committed)) {
if (args->oaua_update != NULL) {
{
struct osp_thandle *oth = thandle_to_osp_thandle(th);
+ if (oth->ot_super.th_sync)
+ oth->ot_our->our_flags |= UPDATE_FL_SYNC;
/* For remote thandle, if there are local thandle, start it here*/
if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
return dt_trans_start(env, oth->ot_storage_th->th_dev,
}
if (!osp->opd_connect_mdt) {
+ osp_trans_callback(env, oth, th->th_result);
rc = osp_send_update_req(env, osp, oth->ot_our);
GOTO(out, rc);
}