* GPL HEADER END
*/
/*
- * Copyright (c) 2013, Intel Corporation.
+ * Copyright (c) 2014, Intel Corporation.
*/
/*
* lustre/target/out_lib.c
#define OUT_UPDATE_BUFFER_SIZE_ADD 4096
#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */
-
-struct dt_update_request*
-out_find_update(struct thandle_update *tu, struct dt_device *dt_dev)
-{
- struct dt_update_request *dt_update;
-
- list_for_each_entry(dt_update, &tu->tu_remote_update_list,
- dur_list) {
- if (dt_update->dur_dt == dt_dev)
- return dt_update;
- }
- return NULL;
-}
-EXPORT_SYMBOL(out_find_update);
-
-static struct object_update_request *object_update_request_alloc(size_t size)
-{
- struct object_update_request *ourq;
-
- OBD_ALLOC_LARGE(ourq, size);
- if (ourq == NULL)
- RETURN(ERR_PTR(-ENOMEM));
-
- ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
- ourq->ourq_count = 0;
-
- RETURN(ourq);
-}
-
-static void object_update_request_free(struct object_update_request *ourq,
- size_t ourq_size)
-{
- if (ourq != NULL)
- OBD_FREE_LARGE(ourq, ourq_size);
-}
-
-void dt_update_request_destroy(struct dt_update_request *dt_update)
-{
- if (dt_update == NULL)
- return;
-
- list_del(&dt_update->dur_list);
-
- object_update_request_free(dt_update->dur_buf.ub_req,
- dt_update->dur_buf.ub_req_size);
- OBD_FREE_PTR(dt_update);
-}
-EXPORT_SYMBOL(dt_update_request_destroy);
-
-/**
- * Allocate and initialize dt_update_request
- *
- * dt_update_request is being used to track updates being executed on
- * this dt_device(OSD or OSP). The update buffer will be 8k initially,
- * and increased if needed.
- *
- * \param [in] dt dt device
- *
- * \retval dt_update_request being allocated if succeed
- * \retval ERR_PTR(errno) if failed
- */
-struct dt_update_request *dt_update_request_create(struct dt_device *dt)
-{
- struct dt_update_request *dt_update;
- struct object_update_request *ourq;
-
- OBD_ALLOC_PTR(dt_update);
- if (!dt_update)
- return ERR_PTR(-ENOMEM);
-
- ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
- if (IS_ERR(ourq)) {
- OBD_FREE_PTR(dt_update);
- return ERR_CAST(ourq);
- }
-
- dt_update->dur_buf.ub_req = ourq;
- dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
-
- INIT_LIST_HEAD(&dt_update->dur_list);
- dt_update->dur_dt = dt;
- dt_update->dur_batchid = 0;
- INIT_LIST_HEAD(&dt_update->dur_cb_items);
-
- return dt_update;
-}
-EXPORT_SYMBOL(dt_update_request_create);
-
-/**
- * Find or create dt_update_request.
- *
- * Find or create one loc in th_dev/dev_obj_update for the update,
- * Because only one thread can access this thandle, no need
- * lock now.
- *
- * \param[in] th transaction handle
- * \param[in] dt lookup update request by dt_object
- *
- * \retval pointer of dt_update_request if it can be created
- * or found.
- * \retval ERR_PTR(errno) if it can not be created or found.
- */
-struct dt_update_request *
-dt_update_request_find_or_create(struct thandle *th, struct dt_object *dt)
-{
- struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
- struct thandle_update *tu = th->th_update;
- struct dt_update_request *update;
- ENTRY;
-
- if (tu == NULL) {
- OBD_ALLOC_PTR(tu);
- if (tu == NULL)
- RETURN(ERR_PTR(-ENOMEM));
-
- INIT_LIST_HEAD(&tu->tu_remote_update_list);
- tu->tu_sent_after_local_trans = 0;
- th->th_update = tu;
- }
-
- update = out_find_update(tu, dt_dev);
- if (update != NULL)
- RETURN(update);
-
- update = dt_update_request_create(dt_dev);
- if (IS_ERR(update))
- RETURN(update);
-
- list_add_tail(&update->dur_list, &tu->tu_remote_update_list);
-
- if (!tu->tu_only_remote_trans)
- thandle_get(th);
-
- RETURN(update);
-}
-EXPORT_SYMBOL(dt_update_request_find_or_create);
-
-/**
- * Prepare update request.
- *
- * Prepare OUT update ptlrpc request, and the request usually includes
- * all of updates (stored in \param ureq) from one operation.
- *
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] ureq hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
- *
- * \retval 0 if preparation succeeds.
- * \retval negative errno if preparation fails.
- */
-int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
- const struct object_update_request *ureq,
- struct ptlrpc_request **reqp)
-{
- struct ptlrpc_request *req;
- struct object_update_request *tmp;
- int ureq_len;
- int rc;
- ENTRY;
-
- req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- ureq_len = object_update_request_size(ureq);
- req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
- ureq_len);
-
- rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
- if (rc != 0) {
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
- RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
-
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
- memcpy(tmp, ureq, ureq_len);
-
- ptlrpc_request_set_replen(req);
- req->rq_request_portal = OUT_PORTAL;
- req->rq_reply_portal = OSC_REPLY_PORTAL;
- *reqp = req;
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(out_prep_update_req);
-
-/**
- * Send update RPC.
- *
- * Send update request to the remote MDT synchronously.
- *
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] dt_update hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
- *
- * \retval 0 if RPC succeeds.
- * \retval negative errno if RPC fails.
- */
-int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
- struct dt_update_request *dt_update,
- struct ptlrpc_request **reqp)
-{
- struct ptlrpc_request *req = NULL;
- int rc;
- ENTRY;
-
- rc = out_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
- if (rc != 0)
- RETURN(rc);
-
- /* Note: some dt index api might return non-zero result here, like
- * osd_index_ea_lookup, so we should only check rc < 0 here */
- rc = ptlrpc_queue_wait(req);
- if (rc < 0) {
- ptlrpc_req_finished(req);
- dt_update->dur_rc = rc;
- RETURN(rc);
- }
-
- if (reqp != NULL) {
- *reqp = req;
- RETURN(rc);
- }
-
- dt_update->dur_rc = rc;
-
- ptlrpc_req_finished(req);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(out_remote_sync);
-
/**
* resize update buffer
*
* \param[in] batchid batchid(transaction no) of this update
*
* \retval 0 pack update succeed.
- * negative errno pack update failed.
+ * \retval negative errno pack update failed.
**/
-static struct object_update*
+static struct object_update *
out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
enum update_type op, const struct lu_fid *fid,
int params_count, __u16 *param_sizes, __u64 batchid)
param = (struct object_update_param *)((char *)param +
object_update_param_size(param));
}
- ureq->ourq_count++;
CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n",
ureq, PFID(fid), ureq->ourq_count, op, params_count,
(int)update_size);
+ ureq->ourq_count++;
RETURN(obj_update);
}