-static int osp_prep_update_req(const struct lu_env *env,
- struct osp_device *osp,
- struct update_buf *ubuf, int ubuf_len,
- struct ptlrpc_request **reqp)
-{
- struct obd_import *imp;
- struct ptlrpc_request *req;
- struct update_buf *tmp;
- int rc;
- ENTRY;
-
- imp = osp->opd_obd->u.cli.cl_import;
- LASSERT(imp);
-
- req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
- UPDATE_BUFFER_SIZE);
-
- rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
- if (rc != 0) {
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
- UPDATE_BUFFER_SIZE);
-
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
- memcpy(tmp, ubuf, ubuf_len);
-
- ptlrpc_request_set_replen(req);
-
- *reqp = req;
-
- RETURN(rc);
-}
-
-static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
- struct update_request *update,
- struct ptlrpc_request **reqp)
-{
- struct osp_device *osp = dt2osp_dev(dt);
- struct ptlrpc_request *req = NULL;
- int rc;
- ENTRY;
-
- rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
- &req);
- if (rc)
- RETURN(rc);
-
- /* Note: some dt index api might return non-zero result here, like
- * osd_index_ea_lookup, so we should only check rc < 0 here */
- rc = ptlrpc_queue_wait(req);
- if (rc < 0) {
- ptlrpc_req_finished(req);
- update->ur_rc = rc;
- RETURN(rc);
- }
-
- if (reqp != NULL) {
- *reqp = req;
- RETURN(rc);
- }
-
- update->ur_rc = rc;
-
- ptlrpc_req_finished(req);
-
- RETURN(rc);
-}
-
-/**
- * Create a new update request for the device.
- */
-static struct update_request
-*osp_create_update_req(struct dt_device *dt)
-{
- struct update_request *update;
-
- OBD_ALLOC_PTR(update);
- if (!update)
- return ERR_PTR(-ENOMEM);
-
- OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
- if (update->ur_buf == NULL) {
- OBD_FREE_PTR(update);
- return ERR_PTR(-ENOMEM);
- }
-
- CFS_INIT_LIST_HEAD(&update->ur_list);
- update->ur_dt = dt;
- update->ur_batchid = 0;
- update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
- update->ur_buf->ub_count = 0;
-
- return update;
-}
-
-static void osp_destroy_update_req(struct update_request *update)
-{
- if (update == NULL)
- return;
-
- cfs_list_del(&update->ur_list);
- if (update->ur_buf != NULL)
- OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
-
- OBD_FREE_PTR(update);
- return;
-}
-
-int osp_trans_stop(const struct lu_env *env, struct thandle *th)
-{
- int rc = 0;
-
- rc = th->th_current_request->ur_rc;
- osp_destroy_update_req(th->th_current_request);
- th->th_current_request = NULL;
-
- return rc;
-}
-
-/**
- * In DNE phase I, all remote updates will be packed into RPC (the format
- * description is in lustre_idl.h) during declare phase, all of updates
- * are attached to the transaction, one entry per OSP. Then in trans start,
- * LOD will walk through these entries and send these UPDATEs to the remote
- * MDT to be executed synchronously.
- */
-int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
- struct thandle *th)
-{
- struct update_request *update;
- int rc = 0;
-
- /* In phase I, if the transaction includes remote updates, the local
- * update should be synchronized, so it will set th_sync = 1 */
- update = th->th_current_request;
- LASSERT(update != NULL && update->ur_dt == dt);
- if (update->ur_buf->ub_count > 0) {
- rc = osp_remote_sync(env, dt, update, NULL);
- th->th_sync = 1;
- }
-
- RETURN(rc);
-}
-
-/**
- * Insert the update into the th_bufs for the device.
- */
-static int osp_insert_update(const struct lu_env *env,
- struct update_request *update, int op,
- struct lu_fid *fid, int count,
- int *lens, char **bufs)
-{
- struct update_buf *ubuf = update->ur_buf;
- struct update *obj_update;
- char *ptr;
- int i;
- int update_length;
- int rc = 0;
- ENTRY;
-
- obj_update = (struct update *)((char *)ubuf +
- cfs_size_round(update_buf_size(ubuf)));
-
- /* Check update size to make sure it can fit into the buffer */
- update_length = cfs_size_round(offsetof(struct update,
- u_bufs[0]));
- for (i = 0; i < count; i++)
- update_length += cfs_size_round(lens[i]);
-
- if (cfs_size_round(update_buf_size(ubuf)) + update_length >
- UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) {
- CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n",
- update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf,
- update_length, ubuf->ub_count, update_buf_size(ubuf),
- -E2BIG);
- RETURN(-E2BIG);
- }
-
- if (count > UPDATE_BUF_COUNT) {
- CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n",
- update->ur_dt->dd_lu_dev.ld_obd->obd_name, count,
- PFID(fid), op, -E2BIG);
- RETURN(-E2BIG);
- }
-
- /* fill the update into the update buffer */
- fid_cpu_to_le(&obj_update->u_fid, fid);
- obj_update->u_type = cpu_to_le32(op);
- obj_update->u_batchid = update->ur_batchid;
- for (i = 0; i < count; i++)
- obj_update->u_lens[i] = cpu_to_le32(lens[i]);
-
- ptr = (char *)obj_update +
- cfs_size_round(offsetof(struct update, u_bufs[0]));
- for (i = 0; i < count; i++)
- LOGL(bufs[i], lens[i], ptr);
-
- ubuf->ub_count++;
-
- CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
- update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
- ubuf->ub_count, op, count, update_buf_size(ubuf));
-
- RETURN(rc);
-}
-
-static struct update_request
-*osp_find_update(struct thandle *th, struct dt_device *dt_dev)
-{
- struct update_request *update;
-
- /* Because transaction api does not proivde the interface
- * to transfer the update from LOD to OSP, we need walk
- * remote update list to find the update, this probably
- * should move to LOD layer, when update can be part of
- * the trancation api parameter. XXX */
- cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
- if (update->ur_dt == dt_dev)
- return update;
- }
- return NULL;
-}
-
-static inline void osp_md_add_update_batchid(struct update_request *update)
-{
- update->ur_batchid++;
-}
-
-/**
- * Find one loc in th_dev/dev_obj_update for the update,
- * Because only one thread can access this thandle, no need
- * lock now.
- */
-static struct update_request
-*osp_find_create_update_loc(struct thandle *th, struct dt_object *dt)
-{
- struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
- struct update_request *update;
- ENTRY;
-
- update = osp_find_update(th, dt_dev);
- if (update != NULL)
- RETURN(update);
-
- update = osp_create_update_req(dt_dev);
- if (IS_ERR(update))
- RETURN(update);
-
- cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list);
-
- RETURN(update);
-}
-
-static int osp_get_attr_from_req(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct lu_attr *attr, int index)
-{
- struct update_reply *reply;
- struct obdo *lobdo = &osp_env_info(env)->osi_obdo;
- struct obdo *wobdo;
- int size;
-
- LASSERT(attr != NULL);
-
- reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
- UPDATE_BUFFER_SIZE);
- if (reply->ur_version != UPDATE_REPLY_V1)
- return -EPROTO;
-
- size = update_get_reply_buf(reply, (void **)&wobdo, index);
- if (size != sizeof(struct obdo))
- return -EPROTO;
-
- obdo_le_to_cpu(wobdo, wobdo);
- lustre_get_wire_obdo(NULL, lobdo, wobdo);
- la_from_obdo(attr, lobdo, lobdo->o_valid);
-
- return 0;
-}
-
-static int osp_md_declare_object_create(const struct lu_env *env,
- struct dt_object *dt,
- struct lu_attr *attr,
- struct dt_allocation_hint *hint,
- struct dt_object_format *dof,
- struct thandle *th)