X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_md_object.c;h=6ff56f4a8af58042f007dbdcf7f027b6e3b3df09;hp=c3805f924ef551b305caa1426550dc47a762ee86;hb=03963106926883cf322e085feb8caa3ea64db1d1;hpb=d51e4a48fd75bd9840bdf5d84701efb8d18fd669 diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index c3805f9..6ff56f4 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -20,1142 +20,1334 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, Intel Corporation. + * Copyright (c) 2013, 2017, Intel Corporation. */ /* * lustre/osp/osp_md_object.c * - * Lustre MDT Proxy Device + * OST/MDT proxy device (OSP) Metadata methods + * + * This file implements methods for remote MD object, which include + * dt_object_operations, dt_index_operations and dt_body_operations. + * + * If there are multiple MDTs in one filesystem, one operation might + * include modifications in several MDTs. In such cases, clients + * send the RPC to the master MDT, then the operation is decomposed into + * object updates which will be dispatched to OSD or OSP. The local updates + * go to local OSD and the remote updates go to OSP. In OSP, these remote + * object updates will be packed into an update RPC, sent to the remote MDT + * and handled by Object Update Target (OUT). + * + * In DNE phase I, because of missing complete recovery solution, updates + * will be executed in order and synchronously. + * 1. The transaction is created. + * 2. In transaction declare, it collects and packs remote + * updates (in osp_md_declare_xxx()). + * 3. In transaction start, it sends these remote updates + * to remote MDTs, which will execute these updates synchronously. + * 4. In transaction execute phase, the local updates will be executed + * synchronously. * * Author: Di Wang */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDS +#include #include -#include #include "osp_internal.h" -static const char dot[] = "."; -static const char dotdot[] = ".."; - -static int osp_prep_update_req(const struct lu_env *env, - struct osp_device *osp, - struct update_buf *ubuf, int ubuf_len, - struct ptlrpc_request **reqp) -{ - struct obd_import *imp; - struct ptlrpc_request *req; - struct update_buf *tmp; - int rc; - ENTRY; - - imp = osp->opd_obd->u.cli.cl_import; - LASSERT(imp); - - req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ); - if (req == NULL) - RETURN(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT, - UPDATE_BUFFER_SIZE); - - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ); - if (rc != 0) { - ptlrpc_req_finished(req); - RETURN(rc); - } - - req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER, - UPDATE_BUFFER_SIZE); - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE); - memcpy(tmp, ubuf, ubuf_len); - - ptlrpc_request_set_replen(req); - - *reqp = req; - - RETURN(rc); -} - -static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt, - struct update_request *update, - struct ptlrpc_request **reqp) -{ - struct osp_device *osp = dt2osp_dev(dt); - struct ptlrpc_request *req = NULL; - int rc; - ENTRY; - - rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE, - &req); - if (rc) - RETURN(rc); - - /* Note: some dt index api might return non-zero result here, like - * osd_index_ea_lookup, so we should only check rc < 0 here */ - rc = ptlrpc_queue_wait(req); - if (rc < 0) { - ptlrpc_req_finished(req); - update->ur_rc = rc; - RETURN(rc); - } - - if (reqp != NULL) { - *reqp = req; - RETURN(rc); - } - - update->ur_rc = rc; - - ptlrpc_req_finished(req); - RETURN(rc); -} +#define OUT_UPDATE_BUFFER_SIZE_ADD 4096 +#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1M update size now */ /** - * Create a new update request for the device. + * Interpreter call for object creation + * + * Object creation interpreter, which will be called after creating + * the remote object to set flags and status. + * + * \param[in] env execution environment + * \param[in] reply update reply + * \param[in] req ptlrpc update request for creating object + * \param[in] obj object to be created + * \param[in] data data used in this function. + * \param[in] index index(position) of create update in the whole + * updates + * \param[in] rc update result on the remote MDT. + * + * \retval only return 0 for now */ -static struct update_request -*osp_create_update_req(struct dt_device *dt) +static int osp_create_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) { - struct update_request *update; - - OBD_ALLOC_PTR(update); - if (!update) - return ERR_PTR(-ENOMEM); - - OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE); - if (update->ur_buf == NULL) { - OBD_FREE_PTR(update); - return ERR_PTR(-ENOMEM); + if (rc != 0 && rc != -EEXIST) { + obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; + obj->opo_non_exist = 1; } - CFS_INIT_LIST_HEAD(&update->ur_list); - update->ur_dt = dt; - update->ur_batchid = 0; - update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC; - update->ur_buf->ub_count = 0; - - return update; -} - -static void osp_destroy_update_req(struct update_request *update) -{ - if (update == NULL) - return; - - cfs_list_del(&update->ur_list); - if (update->ur_buf != NULL) - OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE); - - OBD_FREE_PTR(update); - return; -} - -int osp_trans_stop(const struct lu_env *env, struct thandle *th) -{ - int rc = 0; - - rc = th->th_current_request->ur_rc; - osp_destroy_update_req(th->th_current_request); - th->th_current_request = NULL; + /* + * invalidate opo cache for the object after the object is created, so + * attr_get will try to get attr from remote object. + */ + osp_obj_invalidate_cache(obj); - return rc; + return 0; } /** - * In DNE phase I, all remote updates will be packed into RPC (the format - * description is in lustre_idl.h) during declare phase, all of updates - * are attached to the transaction, one entry per OSP. Then in trans start, - * LOD will walk through these entries and send these UPDATEs to the remote - * MDT to be executed synchronously. + * Implementation of dt_object_operations::do_declare_create + * + * Create the osp_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env execution environment + * \param[in] dt remote object to be created + * \param[in] attr attribute of the created object + * \param[in] hint creation hint + * \param[in] dof creation format information + * \param[in] th the transaction handle + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ -int osp_trans_start(const struct lu_env *env, struct dt_device *dt, - struct thandle *th) +int osp_md_declare_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - struct update_request *update; - int rc = 0; - - /* In phase I, if the transaction includes remote updates, the local - * update should be synchronized, so it will set th_sync = 1 */ - update = th->th_current_request; - LASSERT(update != NULL && update->ur_dt == dt); - if (update->ur_buf->ub_count > 0) { - rc = osp_remote_sync(env, dt, update, NULL); - th->th_sync = 1; - } - - RETURN(rc); + return osp_trans_update_request_create(th); } -/** - * Insert the update into the th_bufs for the device. - */ -static int osp_insert_update(const struct lu_env *env, - struct update_request *update, int op, - struct lu_fid *fid, int count, - int *lens, char **bufs) +struct object_update * +update_buffer_get_update(struct object_update_request *request, + unsigned int index) { - struct update_buf *ubuf = update->ur_buf; - struct update *obj_update; - char *ptr; - int i; - int update_length; - int rc = 0; - ENTRY; - - obj_update = (struct update *)((char *)ubuf + - cfs_size_round(update_buf_size(ubuf))); - - /* Check update size to make sure it can fit into the buffer */ - update_length = cfs_size_round(offsetof(struct update, - u_bufs[0])); - for (i = 0; i < count; i++) - update_length += cfs_size_round(lens[i]); - - if (cfs_size_round(update_buf_size(ubuf)) + update_length > - UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) { - CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n", - update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, - update_length, ubuf->ub_count, update_buf_size(ubuf), - -E2BIG); - RETURN(-E2BIG); - } - - if (count > UPDATE_BUF_COUNT) { - CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n", - update->ur_dt->dd_lu_dev.ld_obd->obd_name, count, - PFID(fid), op, -E2BIG); - RETURN(-E2BIG); - } - - /* fill the update into the update buffer */ - fid_cpu_to_le(&obj_update->u_fid, fid); - obj_update->u_type = cpu_to_le32(op); - obj_update->u_batchid = update->ur_batchid; - for (i = 0; i < count; i++) - obj_update->u_lens[i] = cpu_to_le32(lens[i]); - - ptr = (char *)obj_update + - cfs_size_round(offsetof(struct update, u_bufs[0])); - for (i = 0; i < count; i++) - LOGL(bufs[i], lens[i], ptr); + void *ptr; + int i; - ubuf->ub_count++; + if (index > request->ourq_count) + return NULL; - CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n", - update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid), - ubuf->ub_count, op, count, update_buf_size(ubuf)); + ptr = &request->ourq_updates[0]; + for (i = 0; i < index; i++) + ptr += object_update_size(ptr); - RETURN(rc); -} - -static struct update_request -*osp_find_update(struct thandle *th, struct dt_device *dt_dev) -{ - struct update_request *update; - - /* Because transaction api does not proivde the interface - * to transfer the update from LOD to OSP, we need walk - * remote update list to find the update, this probably - * should move to LOD layer, when update can be part of - * the trancation api parameter. XXX */ - cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) { - if (update->ur_dt == dt_dev) - return update; - } - return NULL; -} - -static inline void osp_md_add_update_batchid(struct update_request *update) -{ - update->ur_batchid++; + return ptr; } /** - * Find one loc in th_dev/dev_obj_update for the update, - * Because only one thread can access this thandle, no need - * lock now. + * Implementation of dt_object_operations::do_create + * + * It adds an OUT_CREATE sub-request into the OUT RPC that will be flushed + * when the transaction stop, and sets necessary flags for created object. + * + * \param[in] env execution environment + * \param[in] dt object to be created + * \param[in] attr attribute of the created object + * \param[in] hint creation hint + * \param[in] dof creation format information + * \param[in] th the transaction handle + * + * \retval 0 if packing creation succeeds. + * \retval negative errno if packing creation fails. */ -static struct update_request -*osp_find_create_update_loc(struct thandle *th, struct dt_object *dt) -{ - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct update_request *update; - ENTRY; - - update = osp_find_update(th, dt_dev); - if (update != NULL) - RETURN(update); - - update = osp_create_update_req(dt_dev); - if (IS_ERR(update)) - RETURN(update); - - cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list); - - RETURN(update); -} - -static int osp_get_attr_from_req(const struct lu_env *env, - struct ptlrpc_request *req, - struct lu_attr *attr, int index) -{ - struct update_reply *reply; - struct obdo *lobdo = &osp_env_info(env)->osi_obdo; - struct obdo *wobdo; - int size; - - LASSERT(attr != NULL); - - reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY, - UPDATE_BUFFER_SIZE); - if (reply->ur_version != UPDATE_REPLY_V1) - return -EPROTO; - - size = update_get_reply_buf(reply, (void **)&wobdo, index); - if (size != sizeof(struct obdo)) - return -EPROTO; - - obdo_le_to_cpu(wobdo, wobdo); - lustre_get_wire_obdo(NULL, lobdo, wobdo); - la_from_obdo(attr, lobdo, lobdo->o_valid); - - return 0; -} - -static int osp_md_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +int osp_md_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); - struct update_request *update; - struct lu_fid *fid1; - int sizes[2] = {sizeof(struct obdo), 0}; - char *bufs[2] = {NULL, NULL}; - int buf_count; - int rc; + struct osp_update_request *update; + struct osp_object *obj = dt2osp_obj(dt); + int rc; + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); - update = osp_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); + if (!(attr->la_valid & LA_TYPE)) { + CERROR("%s: create type not specified: valid %llx\n", + dt->do_lu.lo_dev->ld_obd->obd_name, attr->la_valid); + GOTO(out, rc = -EINVAL); } - osi->osi_obdo.o_valid = 0; - LASSERT(S_ISDIR(attr->la_mode)); - obdo_from_la(&osi->osi_obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo); - - bufs[0] = (char *)&osi->osi_obdo; - buf_count = 1; - fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu); - if (hint->dah_parent) { - struct lu_fid *fid2; - struct lu_fid *tmp_fid = &osi->osi_fid; - - fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu); - fid_cpu_to_le(tmp_fid, fid2); - sizes[1] = sizeof(*tmp_fid); - bufs[1] = (char *)tmp_fid; - buf_count++; - } + rc = OSP_UPDATE_RPC_PACK(env, out_create_pack, update, + lu_object_fid(&dt->do_lu), attr, hint, dof); + if (rc != 0) + GOTO(out, rc); - if (lu_object_exists(&dt->do_lu)) { - /* If the object already exists, we needs to destroy - * this orphan object first. - * - * The scenario might happen in this case - * - * 1. client send remote create to MDT0. - * 2. MDT0 send create update to MDT1. - * 3. MDT1 finished create synchronously. - * 4. MDT0 failed and reboot. - * 5. client resend remote create to MDT0. - * 6. MDT0 tries to resend create update to MDT1, - * but find the object already exists - */ - CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n", - dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1)); - - rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0, - NULL, NULL); - if (rc != 0) - GOTO(out, rc); - - if (S_ISDIR(lu_object_attr(&dt->do_lu))) { - /* decrease for ".." */ - rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, - 0, NULL, NULL); - if (rc != 0) - GOTO(out, rc); - } + rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, + osp_create_interpreter); - rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL, - NULL); - if (rc != 0) - GOTO(out, rc); + if (rc < 0) + GOTO(out, rc); - dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; - /* Increase batchid to add this orphan object deletion - * to separate transaction */ - osp_md_add_update_batchid(update); - } + dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); + dt2osp_obj(dt)->opo_non_exist = 0; + obj->opo_stale = 0; - rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes, - bufs); + obj->opo_attr = *attr; out: - if (rc) - CERROR("%s: Insert update error: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, rc); - return rc; } -static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +/** + * Implementation of dt_object_operations::do_declare_ref_del + * + * Create the osp_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env execution environment + * \param[in] dt object to decrease the reference count. + * \param[in] th the transaction handle of refcount decrease. + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +static int osp_md_declare_ref_del(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) { - struct osp_object *obj = dt2osp_obj(dt); - - CDEBUG(D_INFO, "create object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); - - /* Because the create update RPC will be sent during declare phase, - * if creation reaches here, it means the object has been created - * successfully */ - dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); - obj->opo_empty = 1; - - return 0; + return osp_trans_update_request_create(th); } -static int osp_md_declare_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +/** + * Implementation of dt_object_operations::do_ref_del + * + * Add an OUT_REF_DEL sub-request into the OUT RPC that will be + * flushed when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object to decrease the reference count + * \param[in] th the transaction handle + * + * \retval 0 if packing ref_del succeeds. + * \retval negative errno if packing fails. + */ +static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - struct update_request *update; - struct lu_fid *fid; - int rc; - - update = osp_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + struct osp_update_request *update; + int rc; - rc = osp_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL); + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + rc = OSP_UPDATE_RPC_PACK(env, out_ref_del_pack, update, + lu_object_fid(&dt->do_lu)); return rc; } -static int osp_md_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) -{ - CDEBUG(D_INFO, "ref del object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); - - return 0; -} - +/** + * Implementation of dt_object_operations::do_declare_ref_del + * + * Create the osp_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env execution environment + * \param[in] dt object on which to increase the reference count. + * \param[in] th the transaction handle. + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ static int osp_md_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct update_request *update; - struct lu_fid *fid; - int rc; - - update = osp_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = osp_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL); - - return rc; + return osp_trans_update_request_create(th); } -static int osp_md_object_ref_add(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +/** + * Implementation of dt_object_operations::do_ref_add + * + * Add an OUT_REF_ADD sub-request into the OUT RPC that will be flushed + * when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object on which to increase the reference count + * \param[in] th the transaction handle + * + * \retval 0 if packing ref_add succeeds. + * \retval negative errno if packing fails. + */ +static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - CDEBUG(D_INFO, "ref add object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + struct osp_update_request *update; + int rc; - return 0; + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_ref_add_pack, update, + lu_object_fid(&dt->do_lu)); + return rc; } +/** + * Implementation of dt_object_operations::do_ah_init + * + * Initialize the allocation hint for object creation, which is usually called + * before the creation, and these hints (parent and child mode) will be sent to + * the remote Object Update Target (OUT) and used in the object create process, + * same as OSD object creation. + * + * \param[in] env execution environment + * \param[in] ah the hint to be initialized + * \param[in] parent the parent of the object + * \param[in] child the object to be created + * \param[in] child_mode the mode of the created object + */ static void osp_md_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, struct dt_object *parent, struct dt_object *child, - cfs_umode_t child_mode) + umode_t child_mode) { LASSERT(ah); - memset(ah, 0, sizeof(*ah)); ah->dah_parent = parent; - ah->dah_mode = child_mode; } -static int osp_md_declare_attr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_attr *attr, - struct thandle *th) +/** + * Implementation of dt_object_operations::do_declare_attr_get + * + * Create the osp_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env execution environment + * \param[in] dt object on which to set attributes + * \param[in] attr attributes to be set + * \param[in] th the transaction handle + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); - struct update_request *update; - struct lu_fid *fid; - int size = sizeof(struct obdo); - char *buf; - int rc; - - update = osp_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - osi->osi_obdo.o_valid = 0; - LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE))); - obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr, - attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo); + return osp_trans_update_request_create(th); +} - buf = (char *)&osi->osi_obdo; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); +/** + * Implementation of dt_object_operations::do_attr_set + * + * Set attributes to the specified remote object. + * + * Add the OUT_ATTR_SET sub-request into the OUT RPC that will be flushed + * when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object to set attributes + * \param[in] attr attributes to be set + * \param[in] th the transaction handle + * + * \retval 0 if packing attr_set succeeds. + * \retval negative errno if packing fails. + */ +int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) +{ + struct osp_update_request *update; + int rc; - rc = osp_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, &buf); + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + rc = OSP_UPDATE_RPC_PACK(env, out_attr_set_pack, update, + lu_object_fid(&dt->do_lu), attr); return rc; } -static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th, - struct lustre_capa *capa) +/** + * Implementation of dt_object_operations::do_read_lock + * + * osp_md_{read,write}_lock() will only lock the remote object in the + * local cache, which uses the semaphore (opo_sem) inside the osp_object to + * lock the object. Note: it will not lock the object in the whole cluster, + * which relies on the LDLM lock. + * + * \param[in] env execution environment + * \param[in] dt object to be locked + * \param[in] role lock role from MDD layer, see dt_object_role(). + */ +static void osp_md_read_lock(const struct lu_env *env, struct dt_object *dt, + unsigned role) { - CDEBUG(D_INFO, "attr set object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + struct osp_object *obj = dt2osp_obj(dt); - RETURN(0); + LASSERT(obj->opo_owner != env); + down_read_nested(&obj->opo_sem, role); + + LASSERT(obj->opo_owner == NULL); } -static int osp_md_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int flag, - struct thandle *th) +/** + * Implementation of dt_object_operations::do_write_lock + * + * Lock the remote object in write mode. + * + * \param[in] env execution environment + * \param[in] dt object to be locked + * \param[in] role lock role from MDD layer, see dt_object_role(). + */ +static void osp_md_write_lock(const struct lu_env *env, struct dt_object *dt, + unsigned role) { - struct update_request *update; - struct lu_fid *fid; - int sizes[3] = {strlen(name), buf->lb_len, - sizeof(int)}; - char *bufs[3] = {(char *)name, (char *)buf->lb_buf }; - int rc; + struct osp_object *obj = dt2osp_obj(dt); - LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL); - update = osp_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + down_write_nested(&obj->opo_sem, role); + + LASSERT(obj->opo_owner == NULL); + obj->opo_owner = env; +} - flag = cpu_to_le32(flag); - bufs[2] = (char *)&flag; +/** + * Implementation of dt_object_operations::do_read_unlock + * + * Unlock the read lock of remote object. + * + * \param[in] env execution environment + * \param[in] dt object to be unlocked + */ +static void osp_md_read_unlock(const struct lu_env *env, struct dt_object *dt) +{ + struct osp_object *obj = dt2osp_obj(dt); - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = osp_insert_update(env, update, OBJ_XATTR_SET, fid, - ARRAY_SIZE(sizes), sizes, bufs); + up_read(&obj->opo_sem); +} - return rc; +/** + * Implementation of dt_object_operations::do_write_unlock + * + * Unlock the write lock of remote object. + * + * \param[in] env execution environment + * \param[in] dt object to be unlocked + */ +static void osp_md_write_unlock(const struct lu_env *env, struct dt_object *dt) +{ + struct osp_object *obj = dt2osp_obj(dt); + + LASSERT(obj->opo_owner == env); + obj->opo_owner = NULL; + up_write(&obj->opo_sem); } -static int osp_md_xattr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, const char *name, int fl, - struct thandle *th, struct lustre_capa *capa) +/** + * Implementation of dt_object_operations::do_write_locked + * + * Test if the object is locked in write mode. + * + * \param[in] env execution environment + * \param[in] dt object to be tested + */ +static int osp_md_write_locked(const struct lu_env *env, struct dt_object *dt) { - CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name, - PFID(&dt->do_lu.lo_header->loh_fid)); + struct osp_object *obj = dt2osp_obj(dt); - return 0; + return obj->opo_owner == env; } -static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, const char *name, - struct lustre_capa *capa) +/** + * Implementation of dt_index_operations::dio_lookup + * + * Look up record by key under a remote index object. It packs lookup update + * into RPC, sends to the remote OUT and waits for the lookup result. + * + * \param[in] env execution environment + * \param[in] dt index object to lookup + * \param[out] rec record in which to return lookup result + * \param[in] key key of index which will be looked up + * + * \retval 1 if the lookup succeeds. + * \retval negative errno if the lookup fails. + */ +static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key) { - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct update_request *update = NULL; - struct ptlrpc_request *req = NULL; - int rc; - int buf_len; - int size; - struct update_reply *reply; - void *ea_buf; + struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct dt_device *dt_dev = &osp->opd_dt_dev; + struct osp_update_request *update; + struct object_update_reply *reply; + struct ptlrpc_request *req = NULL; + struct lu_fid *fid; + int rc; ENTRY; /* Because it needs send the update buffer right away, * just create an update buffer, instead of attaching the * update_remote list of the thandle. */ - update = osp_create_update_req(dt_dev); + update = osp_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); - LASSERT(name != NULL); - buf_len = strlen(name); - rc = osp_insert_update(env, update, OBJ_XATTR_GET, - (struct lu_fid *)lu_object_fid(&dt->do_lu), - 1, &buf_len, (char **)&name); + rc = OSP_UPDATE_RPC_PACK(env, out_index_lookup_pack, update, + lu_object_fid(&dt->do_lu), rec, key); if (rc != 0) { CERROR("%s: Insert update error: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, rc); + dt_dev->dd_lu_dev.ld_obd->obd_name, rc); GOTO(out, rc); } - dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - rc = osp_remote_sync(env, dt_dev, update, &req); - if (rc != 0) + rc = osp_remote_sync(env, osp, update, &req); + if (rc < 0) GOTO(out, rc); - reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY, - UPDATE_BUFFER_SIZE); - if (reply->ur_version != UPDATE_REPLY_V1) { + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { CERROR("%s: Wrong version %x expected %x: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, - reply->ur_version, UPDATE_REPLY_V1, -EPROTO); + reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO); GOTO(out, rc = -EPROTO); } - size = update_get_reply_buf(reply, &ea_buf, 0); - if (size < 0) - GOTO(out, rc = size); + rc = object_update_result_data_get(reply, lbuf, 0); + if (rc < 0) + GOTO(out, rc); - LASSERT(size > 0 && size < CFS_PAGE_SIZE); - LASSERT(ea_buf != NULL); + if (lbuf->lb_len != sizeof(*fid)) { + CERROR("%s: lookup "DFID" %s wrong size %d\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), (char *)key, + (int)lbuf->lb_len); + GOTO(out, rc = -EINVAL); + } - rc = size; - if (buf->lb_buf != NULL) - memcpy(buf->lb_buf, ea_buf, size); -out: + fid = lbuf->lb_buf; + if (req_capsule_rep_need_swab(&req->rq_pill)) + lustre_swab_lu_fid(fid); + if (!fid_is_sane(fid)) { + CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid)); + GOTO(out, rc = -EINVAL); + } + + memcpy(rec, fid, sizeof(*fid)); + + GOTO(out, rc = 1); + +out: if (req != NULL) ptlrpc_req_finished(req); - if (update != NULL) - osp_destroy_update_req(update); + osp_update_request_destroy(env, update); - RETURN(rc); + return rc; } -static void osp_md_object_read_lock(const struct lu_env *env, - struct dt_object *dt, unsigned role) +/** + * Implementation of dt_index_operations::dio_declare_insert + * + * Create the osp_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env execution environment + * \param[in] dt object for which to insert index + * \param[in] rec record of the index which will be inserted + * \param[in] key key of the index which will be inserted + * \param[in] th the transaction handle + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +static int osp_md_declare_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th) { - struct osp_object *obj = dt2osp_obj(dt); + return osp_trans_update_request_create(th); +} - LASSERT(obj->opo_owner != env); - down_read_nested(&obj->opo_sem, role); +/** + * Implementation of dt_index_operations::dio_insert + * + * Add an OUT_INDEX_INSERT sub-request into the OUT RPC that will + * be flushed when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object for which to insert index + * \param[in] rec record of the index to be inserted + * \param[in] key key of the index to be inserted + * \param[in] th the transaction handle + * + * \retval 0 if packing index insert succeeds. + * \retval negative errno if packing fails. + */ +static int osp_md_index_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th) +{ + struct osp_update_request *update; + int rc; - LASSERT(obj->opo_owner == NULL); + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_index_insert_pack, update, + lu_object_fid(&dt->do_lu), rec, key); + return rc; } -static void osp_md_object_write_lock(const struct lu_env *env, - struct dt_object *dt, unsigned role) +/** + * Implementation of dt_index_operations::dio_declare_delete + * + * Create the osp_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env execution environment + * \param[in] dt object for which to delete index + * \param[in] key key of the index + * \param[in] th the transaction handle + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +static int osp_md_declare_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th) { - struct osp_object *obj = dt2osp_obj(dt); + return osp_trans_update_request_create(th); +} - down_write_nested(&obj->opo_sem, role); +/** + * Implementation of dt_index_operations::dio_delete + * + * Add an OUT_INDEX_DELETE sub-request into the OUT RPC that will + * be flushed when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object for which to delete index + * \param[in] key key of the index which will be deleted + * \param[in] th the transaction handle + * + * \retval 0 if packing index delete succeeds. + * \retval negative errno if packing fails. + */ +static int osp_md_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th) +{ + struct osp_update_request *update; + int rc; - LASSERT(obj->opo_owner == NULL); - obj->opo_owner = env; + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_index_delete_pack, update, + lu_object_fid(&dt->do_lu), key); + + return rc; } -static void osp_md_object_read_unlock(const struct lu_env *env, - struct dt_object *dt) +/** + * Implementation of dt_index_operations::dio_it.next + * + * Advance the pointer of the iterator to the next entry. It shares a similar + * internal implementation with osp_orphan_it_next(), which is being used for + * remote orphan index object. This method will be used for remote directory. + * + * \param[in] env execution environment + * \param[in] di iterator of this iteration + * + * \retval 0 if the pointer is advanced successfully. + * \retval 1 if it reaches to the end of the index object. + * \retval negative errno if the pointer cannot be advanced. + */ +static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di) { - struct osp_object *obj = dt2osp_obj(dt); + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + int rc; + ENTRY; - up_read(&obj->opo_sem); +again: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos_ent++; + if (ent == NULL) { + it->ooi_ent = + (struct lu_dirent *)idxpage->lip_entries; + RETURN(0); + } else if (le16_to_cpu(ent->lde_reclen) != 0 && + it->ooi_pos_ent < idxpage->lip_nr) { + ent = (struct lu_dirent *)(((char *)ent) + + le16_to_cpu(ent->lde_reclen)); + it->ooi_ent = ent; + RETURN(0); + } else { + it->ooi_ent = NULL; + } + } + + rc = osp_it_next_page(env, di); + if (rc == 0) + goto again; + + RETURN(rc); } -static void osp_md_object_write_unlock(const struct lu_env *env, - struct dt_object *dt) +/** + * Implementation of dt_index_operations::dio_it.key + * + * Get the key at current iterator poisiton. These iteration methods + * (dio_it) will only be used for iterating the remote directory, so + * the key is the name of the directory entry. + * + * \param[in] env execution environment + * \param[in] di iterator of this iteration + * + * \retval name of the current entry + */ +static struct dt_key *osp_it_key(const struct lu_env *env, + const struct dt_it *di) { - struct osp_object *obj = dt2osp_obj(dt); + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; - LASSERT(obj->opo_owner == env); - obj->opo_owner = NULL; - up_write(&obj->opo_sem); + return (struct dt_key *)ent->lde_name; } -static int osp_md_object_write_locked(const struct lu_env *env, - struct dt_object *dt) +/** + * Implementation of dt_index_operations::dio_it.key_size + * + * Get the key size at current iterator poisiton. These iteration methods + * (dio_it) will only be used for iterating the remote directory, so the key + * size is the name size of the directory entry. + * + * \param[in] env execution environment + * \param[in] di iterator of this iteration + * + * \retval name size of the current entry + */ + +static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di) { - struct osp_object *obj = dt2osp_obj(dt); + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; - return obj->opo_owner == env; + return (int)le16_to_cpu(ent->lde_namelen); } -static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) +/** + * Implementation of dt_index_operations::dio_it.rec + * + * Get the record at current iterator position. These iteration methods + * (dio_it) will only be used for iterating the remote directory, so it + * uses lu_dirent_calc_size() to calculate the record size. + * + * \param[in] env execution environment + * \param[in] di iterator of this iteration + * \param[out] rec the record to be returned + * \param[in] attr attributes of the index object, so it knows + * how to pack the entry. + * + * \retval only return 0 for now + */ +static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) { - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct update_request *update; - struct ptlrpc_request *req = NULL; - int size = strlen((char *)key) + 1; - char *name = (char *)key; - int rc; - struct update_reply *reply; - struct lu_fid *fid; + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + size_t reclen; + + reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr); + memcpy(rec, ent, reclen); + return 0; +} + +/** + * Implementation of dt_index_operations::dio_it.load + * + * Locate the iteration cursor to the specified position (cookie). + * + * \param[in] env pointer to the thread context + * \param[in] di pointer to the iteration structure + * \param[in] hash the specified position + * + * \retval positive number for locating to the exactly position + * or the next + * \retval 0 for arriving at the end of the iteration + * \retval negative error number on failure + */ +static int osp_it_load(const struct lu_env *env, const struct dt_it *di, + __u64 hash) +{ + struct osp_it *it = (struct osp_it *)di; + int rc; + + it->ooi_next = hash; + rc = osp_md_index_it_next(env, (struct dt_it *)di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; +} + +const struct dt_index_operations osp_md_index_ops = { + .dio_lookup = osp_md_index_lookup, + .dio_declare_insert = osp_md_declare_index_insert, + .dio_insert = osp_md_index_insert, + .dio_declare_delete = osp_md_declare_index_delete, + .dio_delete = osp_md_index_delete, + .dio_it = { + .init = osp_it_init, + .fini = osp_it_fini, + .get = osp_it_get, + .put = osp_it_put, + .next = osp_md_index_it_next, + .key = osp_it_key, + .key_size = osp_it_key_size, + .rec = osp_md_index_it_rec, + .store = osp_it_store, + .load = osp_it_load, + .key_rec = osp_it_key_rec, + } +}; + +/** + * Implement OSP layer dt_object_operations::do_xattr_list() interface. + * + * List extended attribute from the specified MDT/OST object, result is not + * cached because this is called by directory migration only. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object + * \param[out] buf pointer to the lu_buf to hold the extended attribute + * + * \retval positive bytes used/required in the buffer + * \retval negative error number on failure + */ +static int osp_md_xattr_list(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf) +{ + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct osp_object *obj = dt2osp_obj(dt); + struct dt_device *dev = &osp->opd_dt_dev; + struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; + struct osp_update_request *update = NULL; + struct ptlrpc_request *req = NULL; + struct object_update_reply *reply; + const char *dname = dt->do_lu.lo_dev->ld_obd->obd_name; + int rc = 0; ENTRY; - /* Because it needs send the update buffer right away, - * just create an update buffer, instead of attaching the - * update_remote list of the thandle. - */ - update = osp_create_update_req(dt_dev); + LASSERT(buf); + + if (unlikely(obj->opo_non_exist)) + RETURN(-ENOENT); + + update = osp_update_request_create(dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = osp_insert_update(env, update, OBJ_INDEX_LOOKUP, - (struct lu_fid *)lu_object_fid(&dt->do_lu), - 1, &size, (char **)&name); + rc = OSP_UPDATE_RPC_PACK(env, out_xattr_list_pack, update, + lu_object_fid(&dt->do_lu), buf->lb_len); if (rc) { - CERROR("%s: Insert update error: rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, rc); + CERROR("%s: Insert update error "DFID": rc = %d\n", + dname, PFID(lu_object_fid(&dt->do_lu)), rc); GOTO(out, rc); } - rc = osp_remote_sync(env, dt_dev, update, &req); + rc = osp_remote_sync(env, osp, update, &req); if (rc < 0) { - CERROR("%s: lookup "DFID" %s failed: rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, - PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc); + if (rc == -ENOENT) { + dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; + obj->opo_non_exist = 1; + } GOTO(out, rc); } - reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY, - UPDATE_BUFFER_SIZE); - if (reply->ur_version != UPDATE_REPLY_V1) { - CERROR("%s: Wrong version %x expected %x: rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, - reply->ur_version, UPDATE_REPLY_V1, -EPROTO); + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { + DEBUG_REQ(D_ERROR, req, + "%s: Wrong version %x expected %x "DFID": rc = %d", + dname, reply->ourp_magic, UPDATE_REPLY_MAGIC, + PFID(lu_object_fid(&dt->do_lu)), -EPROTO); + GOTO(out, rc = -EPROTO); } - rc = update_get_reply_result(reply, NULL, 0); - if (rc < 0) { - CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, - PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc); + rc = object_update_result_data_get(reply, rbuf, 0); + if (rc < 0) GOTO(out, rc); - } - size = update_get_reply_buf(reply, (void **)&fid, 0); - if (size < 0) - GOTO(out, rc = size); + if (!buf->lb_buf) + GOTO(out, rc); - if (size != sizeof(struct lu_fid)) { - CERROR("%s: lookup "DFID" %s wrong size %d: rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, - PFID(lu_object_fid(&dt->do_lu)), (char *)key, size, rc); - GOTO(out, rc = -EINVAL); - } + if (unlikely(buf->lb_len < rbuf->lb_len)) + GOTO(out, rc = -ERANGE); + + memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len); + EXIT; - fid_le_to_cpu(fid, fid); - if (!fid_is_sane(fid)) { - CERROR("%s: lookup "DFID" %s invalid fid "DFID": rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, - PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid), - rc); - GOTO(out, rc = -EINVAL); - } - memcpy(rec, fid, sizeof(*fid)); out: - if (req != NULL) + if (req) ptlrpc_req_finished(req); - if (update != NULL) - osp_destroy_update_req(update); - - RETURN(rc); -} - -static int osp_md_declare_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th) -{ - struct update_request *update; - struct lu_fid *fid; - struct lu_fid *rec_fid = (struct lu_fid *)rec; - int size[2] = {strlen((char *)key) + 1, - sizeof(*rec_fid)}; - char *bufs[2] = {(char *)key, (char *)rec_fid}; - int rc; - - update = osp_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - PFID(fid), (char *)key, PFID(rec_fid)); - - fid_cpu_to_le(rec_fid, rec_fid); + if (update && !IS_ERR(update)) + osp_update_request_destroy(env, update); - rc = osp_insert_update(env, update, OBJ_INDEX_INSERT, fid, - ARRAY_SIZE(size), size, bufs); return rc; } -static int osp_md_index_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th, - struct lustre_capa *capa, - int ignore_quota) +/** + * Implementation of dt_object_operations::do_index_try + * + * Try to initialize the index API pointer for the given object. This + * is the entry point of the index API, i.e. we must call this method + * to initialize the index object before calling other index methods. + * + * \param[in] env execution environment + * \param[in] dt index object to be initialized + * \param[in] feat the index feature of the object + * + * \retval 0 if the initialization succeeds. + * \retval negative errno if the initialization fails. + */ +static int osp_md_index_try(const struct lu_env *env, + struct dt_object *dt, + const struct dt_index_features *feat) { + dt->do_index_ops = &osp_md_index_ops; return 0; } -static int osp_md_declare_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *th) +/** + * Implementation of dt_object_operations::do_object_lock + * + * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT, + * which will lock the object in the global namespace. And because the + * cross-MDT locks are relatively rare compared with normal local MDT operation, + * let's release it right away, instead of putting it into the LRU list. + * + * \param[in] env execution environment + * \param[in] dt object to be locked + * \param[out] lh lock handle + * \param[in] einfo enqueue information + * \param[in] policy lock policy + * + * \retval ELDLM_OK if locking the object succeeds. + * \retval negative errno if locking fails. + */ +static int osp_md_object_lock(const struct lu_env *env, + struct dt_object *dt, + struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + union ldlm_policy_data *policy) { - struct update_request *update; - struct lu_fid *fid; - int size = strlen((char *)key) + 1; - char *buf = (char *)key; - int rc; + struct ldlm_res_id *res_id; + struct osp_device *osp = dt2osp_dev(lu2dt_dev(dt->do_lu.lo_dev)); + struct ptlrpc_request *req; + int rc = 0; + __u64 flags = LDLM_FL_NO_LRU; + ENTRY; - update = osp_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + res_id = einfo->ei_res_id; + LASSERT(res_id != NULL); - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + if (einfo->ei_mode & (LCK_EX | LCK_PW)) + flags |= LDLM_FL_COS_INCOMPAT; - rc = osp_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size, - &buf); + req = ldlm_enqueue_pack(osp->opd_exp, 0); + if (IS_ERR(req)) + RETURN(PTR_ERR(req)); - return rc; -} + osp_set_req_replay(osp, req); + rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id, + (const union ldlm_policy_data *)policy, + &flags, NULL, 0, LVB_T_NONE, lh, 0); -static int osp_md_index_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *th, - struct lustre_capa *capa) -{ - CDEBUG(D_INFO, "index delete "DFID" %s\n", - PFID(&dt->do_lu.lo_header->loh_fid), (char *)key); + ptlrpc_req_finished(req); - return 0; + RETURN(rc == ELDLM_OK ? 0 : -EIO); } /** - * Creates or initializes iterator context. - * - * Note: for OSP, these index iterate api is only used to check - * whether the directory is empty now (see mdd_dir_is_empty). - * Since dir_empty will be return by OBJ_ATTR_GET(see osp_md_attr_get/ - * out_attr_get). So the implementation of these iterator is simplied - * to make mdd_dir_is_empty happy. The real iterator should be - * implemented, if we need it one day. + * Implementation of dt_object_operations::do_object_unlock + * + * Cancel a lock of a remote object. + * + * \param[in] env execution environment + * \param[in] dt object to be unlocked + * \param[in] einfo lock enqueue information + * \param[in] policy lock policy + * + * \retval Only return 0 for now. */ -static struct dt_it *osp_it_init(const struct lu_env *env, - struct dt_object *dt, - __u32 attr, - struct lustre_capa *capa) +static int osp_md_object_unlock(const struct lu_env *env, + struct dt_object *dt, + struct ldlm_enqueue_info *einfo, + union ldlm_policy_data *policy) { - lu_object_get(&dt->do_lu); - return (struct dt_it *)dt; -} + struct lustre_handle *lockh = einfo->ei_cbdata; -static void osp_it_fini(const struct lu_env *env, struct dt_it *di) -{ - struct dt_object *dt = (struct dt_object *)di; - lu_object_put(env, &dt->do_lu); + /* unlock finally */ + ldlm_lock_decref(lockh, einfo->ei_mode); + + return 0; } -static int osp_it_get(const struct lu_env *env, - struct dt_it *di, const struct dt_key *key) +/** + * Implement OSP layer dt_object_operations::do_declare_destroy() interface. + * + * Create the dt_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object to be destroyed + * \param[in] th pointer to the transaction handler + * + * \retval 0 for success + * \retval negative error number on failure + */ +int osp_md_declare_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - return 1; + return osp_trans_update_request_create(th); } -static void osp_it_put(const struct lu_env *env, struct dt_it *di) +static int osp_destroy_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) { - return; + return 0; } -static int osp_it_next(const struct lu_env *env, struct dt_it *di) +/** + * Implement OSP layer dt_object_operations::do_destroy() interface. + * + * Pack the destroy update into the RPC buffer, which will be sent + * to the remote MDT during transaction stop. + * + * It also marks the object as non-cached. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object to be destroyed + * \param[in] th pointer to the transaction handler + * + * \retval 0 for success + * \retval negative error number on failure + */ +int osp_md_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - struct dt_object *dt = (struct dt_object *)di; struct osp_object *o = dt2osp_obj(dt); + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct osp_update_request *update; + int rc = 0; + ENTRY; - if (o->opo_empty) - return 1; + o->opo_non_exist = 1; + o->opo_destroyed = 1; - return 0; -} + LASSERT(osp->opd_connect_mdt); + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); -static struct dt_key *osp_it_key(const struct lu_env *env, - const struct dt_it *di) -{ - LBUG(); - return NULL; -} + rc = OSP_UPDATE_RPC_PACK(env, out_destroy_pack, update, + lu_object_fid(&dt->do_lu)); + if (rc != 0) + RETURN(rc); -static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di) -{ - LBUG(); - return 0; -} + /* retain the object and it's status until it's destroyed on remote */ + rc = osp_insert_update_callback(env, update, o, NULL, + osp_destroy_interpreter); + if (rc != 0) + RETURN(rc); -static int osp_it_rec(const struct lu_env *env, const struct dt_it *di, - struct dt_rec *lde, __u32 attr) -{ - LBUG(); - return 0; + set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); + rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, + NULL); + + RETURN(rc); } -static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di) +const struct dt_object_operations osp_md_obj_ops = { + .do_read_lock = osp_md_read_lock, + .do_write_lock = osp_md_write_lock, + .do_read_unlock = osp_md_read_unlock, + .do_write_unlock = osp_md_write_unlock, + .do_write_locked = osp_md_write_locked, + .do_declare_create = osp_md_declare_create, + .do_create = osp_md_create, + .do_declare_ref_add = osp_md_declare_ref_add, + .do_ref_add = osp_md_ref_add, + .do_declare_ref_del = osp_md_declare_ref_del, + .do_ref_del = osp_md_ref_del, + .do_declare_destroy = osp_md_declare_destroy, + .do_destroy = osp_md_destroy, + .do_ah_init = osp_md_ah_init, + .do_attr_get = osp_attr_get, + .do_declare_attr_set = osp_md_declare_attr_set, + .do_attr_set = osp_md_attr_set, + .do_xattr_get = osp_xattr_get, + .do_xattr_list = osp_md_xattr_list, + .do_declare_xattr_set = osp_declare_xattr_set, + .do_xattr_set = osp_xattr_set, + .do_declare_xattr_del = osp_declare_xattr_del, + .do_xattr_del = osp_xattr_del, + .do_index_try = osp_md_index_try, + .do_object_lock = osp_md_object_lock, + .do_object_unlock = osp_md_object_unlock, + .do_invalidate = osp_invalidate, + .do_check_stale = osp_check_stale, +}; + +/** + * Implementation of dt_body_operations::dbo_declare_write + * + * Create the osp_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env execution environment + * \param[in] dt object to be written + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * \param[in] th transaction handle + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +static ssize_t osp_md_declare_write(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + loff_t pos, struct thandle *th) { - LBUG(); + struct osp_device *osp = dt2osp_dev(th->th_dev); + int rc; + + if (dt2osp_obj(dt)->opo_destroyed) + return -ENOENT; + + rc = osp_trans_update_request_create(th); + if (rc != 0) + return rc; + + if (osp->opd_update == NULL) + return 0; + + if (dt2osp_obj(dt)->opo_stale) + return -ESTALE; + return 0; } -static int osp_it_load(const struct lu_env *env, const struct dt_it *di, - __u64 hash) +static int osp_write_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) { - LBUG(); + if (rc) { + CDEBUG(D_HA, "error "DFID": rc = %d\n", + PFID(lu_object_fid(&obj->opo_obj.do_lu)), rc); + OBD_RACE(OBD_FAIL_OUT_OBJECT_MISS); + spin_lock(&obj->opo_lock); + obj->opo_attr.la_valid = 0; + obj->opo_stale = 1; + spin_unlock(&obj->opo_lock); + } return 0; } -static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, - void *key_rec) +/** + * Implementation of dt_body_operations::dbo_write + * + * Pack the write object update into the RPC buffer, which will be sent + * to the remote MDT during transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object to be written + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * \param[in] th transaction handle + * + * \retval the buffer size in bytes if packing succeeds. + * \retval negative errno if packing fails. + */ +static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, + struct thandle *th) { - LBUG(); - return 0; -} + struct osp_object *obj = dt2osp_obj(dt); + struct osp_update_request *update; + struct osp_thandle *oth = thandle_to_osp_thandle(th); + ssize_t rc; + ENTRY; -static const struct dt_index_operations osp_md_index_ops = { - .dio_lookup = osp_md_index_lookup, - .dio_declare_insert = osp_md_declare_insert, - .dio_insert = osp_md_index_insert, - .dio_declare_delete = osp_md_declare_delete, - .dio_delete = osp_md_index_delete, - .dio_it = { - .init = osp_it_init, - .fini = osp_it_fini, - .get = osp_it_get, - .put = osp_it_put, - .next = osp_it_next, - .key = osp_it_key, - .key_size = osp_it_key_size, - .rec = osp_it_rec, - .store = osp_it_store, - .load = osp_it_load, - .key_rec = osp_it_key_rec, + if (obj->opo_destroyed) + RETURN(-ENOENT); + + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + CDEBUG(D_INFO, "write "DFID" offset = %llu length = %zu\n", + PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len); + + rc = OSP_UPDATE_RPC_PACK(env, out_write_pack, update, + lu_object_fid(&dt->do_lu), buf, *pos); + if (rc < 0) + RETURN(rc); + + rc = osp_check_and_set_rpc_version(oth, obj); + if (rc < 0) + RETURN(rc); + + /* to be able to invalidate object's state in case of an error */ + rc = osp_insert_update_callback(env, update, obj, NULL, + osp_write_interpreter); + if (rc < 0) + RETURN(rc); + + /* XXX: how about the write error happened later? */ + *pos += buf->lb_len; + + if (obj->opo_attr.la_valid & LA_SIZE && obj->opo_attr.la_size < *pos) + obj->opo_attr.la_size = *pos; + + spin_lock(&obj->opo_lock); + if (list_empty(&obj->opo_invalidate_cb_list)) { + lu_object_get(&obj->opo_obj.do_lu); + + list_add_tail(&obj->opo_invalidate_cb_list, + &update->our_invalidate_cb_list); } -}; + spin_unlock(&obj->opo_lock); -static int osp_md_index_try(const struct lu_env *env, - struct dt_object *dt, - const struct dt_index_features *feat) + RETURN(buf->lb_len); +} + +static inline void orr_le_to_cpu(struct out_read_reply *orr_dst, + const struct out_read_reply *orr_src) { - dt->do_index_ops = &osp_md_index_ops; - return 0; + orr_dst->orr_size = le32_to_cpu(orr_src->orr_size); + orr_dst->orr_padding = le32_to_cpu(orr_src->orr_padding); + orr_dst->orr_offset = le64_to_cpu(orr_dst->orr_offset); } -static int osp_md_attr_get(const struct lu_env *env, - struct dt_object *dt, struct lu_attr *attr, - struct lustre_capa *capa) + + +static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *rbuf, loff_t *pos) { - struct osp_object *obj = dt2osp_obj(dt); - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct update_request *update = NULL; + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct dt_device *dt_dev = &osp->opd_dt_dev; + struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; + char *ptr = rbuf->lb_buf; + struct osp_update_request *update; struct ptlrpc_request *req = NULL; + struct out_read_reply *orr; + struct ptlrpc_bulk_desc *desc; + struct object_update_reply *reply; + int pages; int rc; ENTRY; + if (dt2osp_obj(dt)->opo_destroyed) + RETURN(-ENOENT); + /* Because it needs send the update buffer right away, * just create an update buffer, instead of attaching the - * update_remote list of the thandle. - */ - update = osp_create_update_req(dt_dev); + * update_remote list of the thandle. */ + update = osp_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = osp_insert_update(env, update, OBJ_ATTR_GET, - (struct lu_fid *)lu_object_fid(&dt->do_lu), - 0, NULL, NULL); - if (rc) { - CERROR("%s: Insert update error: rc = %d\n", + rc = OSP_UPDATE_RPC_PACK(env, out_read_pack, update, + lu_object_fid(&dt->do_lu), + rbuf->lb_len, *pos); + if (rc != 0) { + CERROR("%s: cannot insert update: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, rc); - GOTO(out, rc); + GOTO(out_update, rc); } - dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - - rc = osp_remote_sync(env, dt_dev, update, &req); - if (rc < 0) - GOTO(out, rc); - - rc = osp_get_attr_from_req(env, req, attr, 0); - if (rc) - GOTO(out, rc); - - if (attr->la_flags == 1) - obj->opo_empty = 0; - else - obj->opo_empty = 1; -out: - if (req != NULL) - ptlrpc_req_finished(req); - if (update != NULL) - osp_destroy_update_req(update); - - RETURN(rc); -} - -static int osp_md_declare_object_destroy(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) -{ - struct osp_object *o = dt2osp_obj(dt); - int rc = 0; - ENTRY; + CDEBUG(D_INFO, "%s "DFID" read offset %llu size %zu\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), *pos, rbuf->lb_len); + rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update, + &req); + if (rc != 0) + GOTO(out_update, rc); - /* - * track objects to be destroyed via llog - */ - rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); + /* First *and* last might be partial pages, hence +1 */ + pages = DIV_ROUND_UP(rbuf->lb_len, PAGE_SIZE) + 1; - RETURN(rc); -} + /* allocate bulk descriptor */ + desc = ptlrpc_prep_bulk_imp(req, pages, 1, + PTLRPC_BULK_PUT_SINK, + MDS_BULK_PORTAL, + &ptlrpc_bulk_kiov_nopin_ops); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); -static int osp_md_object_destroy(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) -{ - struct osp_object *o = dt2osp_obj(dt); - int rc = 0; - ENTRY; + desc->bd_frag_ops->add_iov_frag(desc, ptr, rbuf->lb_len); - /* - * once transaction is committed put proper command on - * the queue going to our OST - */ - rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL); + osp_set_req_replay(osp, req); + req->rq_bulk_read = 1; + /* send request to master and wait for RPC to complete */ + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); - /* not needed in cache any more */ - set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) + GOTO(out, rc); - RETURN(rc); -} + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); -static int osp_md_object_lock(const struct lu_env *env, - struct dt_object *dt, - struct lustre_handle *lh, - struct ldlm_enqueue_info *einfo, - void *policy) -{ - struct osp_thread_info *info = osp_env_info(env); - struct ldlm_res_id *res_id = &info->osi_resid; - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct osp_device *osp = dt2osp_dev(dt_dev); - struct ptlrpc_request *req = NULL; - int rc = 0; - __u64 flags = 0; - ldlm_mode_t mode; - - fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id); - - mode = ldlm_lock_match(osp->opd_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, res_id, - einfo->ei_type, - (ldlm_policy_data_t *)policy, - einfo->ei_mode, lh, 0); - if (mode > 0) - return ELDLM_OK; + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { + CERROR("%s: invalid update reply magic %x expected %x:" + " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, + reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO); + GOTO(out, rc = -EPROTO); + } - req = ldlm_enqueue_pack(osp->opd_exp, 0); - if (IS_ERR(req)) - RETURN(PTR_ERR(req)); + rc = object_update_result_data_get(reply, lbuf, 0); + if (rc < 0) + GOTO(out, rc); - rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id, - (const ldlm_policy_data_t *)policy, - &flags, NULL, 0, LVB_T_NONE, lh, 0); + if (lbuf->lb_len < sizeof(*orr)) + GOTO(out, rc = -EPROTO); + orr = lbuf->lb_buf; + orr_le_to_cpu(orr, orr); + rc = orr->orr_size; + *pos = orr->orr_offset; +out: ptlrpc_req_finished(req); - return rc == ELDLM_OK ? 0 : -EIO; +out_update: + osp_update_request_destroy(env, update); + + RETURN(rc); } -struct dt_object_operations osp_md_obj_ops = { - .do_read_lock = osp_md_object_read_lock, - .do_write_lock = osp_md_object_write_lock, - .do_read_unlock = osp_md_object_read_unlock, - .do_write_unlock = osp_md_object_write_unlock, - .do_write_locked = osp_md_object_write_locked, - .do_declare_create = osp_md_declare_object_create, - .do_create = osp_md_object_create, - .do_declare_ref_add = osp_md_declare_ref_add, - .do_ref_add = osp_md_object_ref_add, - .do_declare_ref_del = osp_md_declare_object_ref_del, - .do_ref_del = osp_md_object_ref_del, - .do_declare_destroy = osp_md_declare_object_destroy, - .do_destroy = osp_md_object_destroy, - .do_ah_init = osp_md_ah_init, - .do_attr_get = osp_md_attr_get, - .do_declare_attr_set = osp_md_declare_attr_set, - .do_attr_set = osp_md_attr_set, - .do_declare_xattr_set = osp_md_declare_xattr_set, - .do_xattr_set = osp_md_xattr_set, - .do_xattr_get = osp_md_xattr_get, - .do_index_try = osp_md_index_try, - .do_object_lock = osp_md_object_lock, +/* These body operation will be used to write symlinks during migration etc */ +const struct dt_body_operations osp_md_body_ops = { + .dbo_declare_write = osp_md_declare_write, + .dbo_write = osp_md_write, + .dbo_read = osp_md_read, };