X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_md_object.c;h=21ccacf785f7ec74fa1c786d9bc5ed07ac549465;hb=f28cc25929c4e8a111e96b2205a0433542b35e84;hp=6738763690d790c1d1abf9df7808541023fc8631;hpb=b45e500e5a996d8529ab3d85d542908c93b1e1ce;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 6738763..21ccacf 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, 2014, Intel Corporation. + * Copyright (c) 2013, 2015, Intel Corporation. */ /* * lustre/osp/osp_md_object.c @@ -53,6 +53,7 @@ #define DEBUG_SUBSYSTEM S_MDS +#include #include #include "osp_internal.h" @@ -82,7 +83,7 @@ static int osp_object_create_interpreter(const struct lu_env *env, struct osp_object *obj, void *data, int index, int rc) { - if (rc != 0) { + if (rc != 0 && rc != -EEXIST) { obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; obj->opo_non_exist = 1; } @@ -863,7 +864,9 @@ static int osp_md_index_try(const struct lu_env *env, * Implementation of dt_object_operations::do_object_lock * * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT, - * which will lock the object in the global namespace. + * which will lock the object in the global namespace. And because the + * cross-MDT locks are relatively rare compared with normal local MDT operation, + * let's release it right away, instead of putting it into the LRU list. * * \param[in] env execution environment * \param[in] dt object to be locked @@ -878,32 +881,39 @@ static int osp_md_object_lock(const struct lu_env *env, struct dt_object *dt, struct lustre_handle *lh, struct ldlm_enqueue_info *einfo, - ldlm_policy_data_t *policy) + union ldlm_policy_data *policy) { struct ldlm_res_id *res_id; struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); struct osp_device *osp = dt2osp_dev(dt_dev); + struct lu_device *top_device; struct ptlrpc_request *req; int rc = 0; - __u64 flags = 0; - ldlm_mode_t mode; + __u64 flags = LDLM_FL_NO_LRU; res_id = einfo->ei_res_id; LASSERT(res_id != NULL); - mode = ldlm_lock_match(osp->opd_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, res_id, - einfo->ei_type, policy, - einfo->ei_mode, lh, 0); - if (mode > 0) - return ELDLM_OK; + if (einfo->ei_nonblock) + flags |= LDLM_FL_BLOCK_NOWAIT; + if (einfo->ei_mode & (LCK_EX | LCK_PW)) + flags |= LDLM_FL_COS_INCOMPAT; req = ldlm_enqueue_pack(osp->opd_exp, 0); if (IS_ERR(req)) RETURN(PTR_ERR(req)); + /* During recovery, it needs to let OSP send enqueue + * without checking recoverying status, in case the + * other target is being recovered at the same time, + * and if we wait here for the import to be recovered, + * it might cause deadlock */ + top_device = dt_dev->dd_lu_dev.ld_site->ls_top_dev; + if (top_device->ld_obd->obd_recovering) + req->rq_allow_replay = 1; + rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id, - (const ldlm_policy_data_t *)policy, + (const union ldlm_policy_data *)policy, &flags, NULL, 0, LVB_T_NONE, lh, 0); ptlrpc_req_finished(req); @@ -926,7 +936,7 @@ static int osp_md_object_lock(const struct lu_env *env, static int osp_md_object_unlock(const struct lu_env *env, struct dt_object *dt, struct ldlm_enqueue_info *einfo, - ldlm_policy_data_t *policy) + union ldlm_policy_data *policy) { struct lustre_handle *lockh = einfo->ei_cbdata; @@ -956,35 +966,6 @@ int osp_md_declare_object_destroy(const struct lu_env *env, } /** - * Interpreter call for object destroy - * - * Object destroy interpreter, which will be called after destroying - * the remote object to set flags and status. - * - * \param[in] env execution environment - * \param[in] reply update reply - * \param[in] req ptlrpc update request for destroying object - * \param[in] obj object to be destroyed - * \param[in] data data used in this function. - * \param[in] index index(position) of destroy update in the whole - * updates - * \param[in] rc update result on the remote MDT. - * - * \retval only return 0 for now - */ -static int osp_md_object_destroy_interpreter(const struct lu_env *env, - struct object_update_reply *reply, - struct ptlrpc_request *req, - struct osp_object *obj, - void *data, int index, int rc) -{ - /* not needed in cache any more */ - set_bit(LU_OBJECT_HEARD_BANSHEE, - &obj->opo_obj.do_lu.lo_header->loh_flags); - return 0; -} - -/** * Implement OSP layer dt_object_operations::do_destroy() interface. * * Pack the destroy update into the RPC buffer, which will be sent @@ -1019,8 +1000,10 @@ int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); + set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, - osp_md_object_destroy_interpreter); + NULL); + RETURN(rc); } @@ -1072,7 +1055,20 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, const struct lu_buf *buf, loff_t pos, struct thandle *th) { - return osp_trans_update_request_create(th); + struct osp_device *osp = dt2osp_dev(th->th_dev); + int rc; + + rc = osp_trans_update_request_create(th); + if (rc != 0) + return rc; + + if (osp->opd_update == NULL) + return 0; + + if (dt2osp_obj(dt)->opo_stale) + return -ESTALE; + + return 0; } /** @@ -1104,13 +1100,17 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, update = thandle_to_osp_update_request(th); LASSERT(update != NULL); + CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n", + PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len); + rc = osp_update_rpc_pack(env, write, update, OUT_WRITE, lu_object_fid(&dt->do_lu), buf, *pos); if (rc < 0) RETURN(rc); - CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n", - PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len); + rc = osp_check_and_set_rpc_version(oth, obj); + if (rc < 0) + RETURN(rc); /* XXX: how about the write error happened later? */ *pos += buf->lb_len; @@ -1120,116 +1120,116 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, obj->opo_ooa->ooa_attr.la_size < *pos) obj->opo_ooa->ooa_attr.la_size = *pos; - rc = osp_check_and_set_rpc_version(oth); - if (rc < 0) - RETURN(rc); - RETURN(buf->lb_len); } static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *rbuf, loff_t *pos) { - struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct dt_device *dt_dev = &osp->opd_dt_dev; - struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; - struct osp_update_request *update = NULL; + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct dt_device *dt_dev = &osp->opd_dt_dev; + struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; + char *ptr = rbuf->lb_buf; + struct osp_update_request *update; + struct ptlrpc_request *req = NULL; + struct out_read_reply *orr; + struct ptlrpc_bulk_desc *desc; struct object_update_reply *reply; - struct out_read_reply *orr; - char *ptr = rbuf->lb_buf; - struct ptlrpc_request *req = NULL; - size_t total_length = rbuf->lb_len; - size_t max_buf_size; - loff_t offset = *pos; - int rc; + __u32 left_size; + int nbufs; + int i; + int rc; ENTRY; - /* Calculate the maxium buffer length for each read request */ - max_buf_size = OUT_UPDATE_REPLY_SIZE - cfs_size_round(sizeof(*orr)) - - cfs_size_round(sizeof(struct object_update_result)) - - cfs_size_round(offsetof(struct object_update_reply, - ourp_lens[1])); - while (total_length > 0) { - size_t read_length; - - /* Because it needs send the update buffer right away, - * just create an update buffer, instead of attaching the - * update_remote list of the thandle. */ - update = osp_update_request_create(dt_dev); - if (IS_ERR(update)) - GOTO(out, rc = PTR_ERR(update)); - - read_length = total_length > max_buf_size ? - max_buf_size : total_length; - - rc = osp_update_rpc_pack(env, read, update, OUT_READ, - lu_object_fid(&dt->do_lu), - read_length, offset); - if (rc != 0) { - CERROR("%s: cannot insert update: rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, rc); - GOTO(out, rc); - } - - rc = osp_remote_sync(env, osp, update, &req); - if (rc < 0) - GOTO(out, rc); - - reply = req_capsule_server_sized_get(&req->rq_pill, - &RMF_OUT_UPDATE_REPLY, - OUT_UPDATE_REPLY_SIZE); + /* Because it needs send the update buffer right away, + * just create an update buffer, instead of attaching the + * update_remote list of the thandle. */ + update = osp_update_request_create(dt_dev); + if (IS_ERR(update)) + RETURN(PTR_ERR(update)); - if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { - CERROR("%s: invalid update reply magic %x expected %x:" - " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, - reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO); - GOTO(out, rc = -EPROTO); - } + rc = osp_update_rpc_pack(env, read, update, OUT_READ, + lu_object_fid(&dt->do_lu), + rbuf->lb_len, *pos); + if (rc != 0) { + CERROR("%s: cannot insert update: rc = %d\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, rc); + GOTO(out_update, rc); + } - rc = object_update_result_data_get(reply, lbuf, 0); - if (rc < 0) - GOTO(out, rc); + CDEBUG(D_INFO, "%s "DFID" read offset %llu size %zu\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), *pos, rbuf->lb_len); + rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update, + &req); + if (rc != 0) + GOTO(out_update, rc); + + nbufs = (rbuf->lb_len + OUT_BULK_BUFFER_SIZE - 1) / + OUT_BULK_BUFFER_SIZE; + /* allocate bulk descriptor */ + desc = ptlrpc_prep_bulk_imp(req, nbufs, 1, + PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KVEC, + MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); + + /* split the buffer into small chunk size */ + left_size = rbuf->lb_len; + for (i = 0; i < nbufs; i++) { + int read_size; + + read_size = left_size > OUT_BULK_BUFFER_SIZE ? + OUT_BULK_BUFFER_SIZE : left_size; + desc->bd_frag_ops->add_iov_frag(desc, ptr, read_size); + + ptr += read_size; + } - if (lbuf->lb_len < sizeof(*orr)) - GOTO(out, rc = -EPROTO); + /* This will only be called with read-only update, and these updates + * might be used to retrieve update log during recovery process, so + * it will be allowed to send during recovery process */ + req->rq_allow_replay = 1; + req->rq_bulk_read = 1; + /* send request to master and wait for RPC to complete */ + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); - orr = lbuf->lb_buf; - orr_le_to_cpu(orr, orr); - offset = orr->orr_offset; - if (orr->orr_size > max_buf_size) - GOTO(out, rc = -EPROTO); + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) + GOTO(out, rc); - memcpy(ptr, orr->orr_data, orr->orr_size); - ptr += orr->orr_size; - total_length -= orr->orr_size; + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); - CDEBUG(D_INFO, "%s: read "DFID" pos "LPU64" len %u left %zu\n", - osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)), - offset, orr->orr_size, total_length); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { + CERROR("%s: invalid update reply magic %x expected %x:" + " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, + reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO); + GOTO(out, rc = -EPROTO); + } - if (orr->orr_size < read_length) - break; + rc = object_update_result_data_get(reply, lbuf, 0); + if (rc < 0) + GOTO(out, rc); - ptlrpc_req_finished(req); - osp_update_request_destroy(update); - req = NULL; - update = NULL; - } + if (lbuf->lb_len < sizeof(*orr)) + GOTO(out, rc = -EPROTO); - total_length = rbuf->lb_len - total_length; - *pos = offset; - CDEBUG(D_INFO, "%s: total read "DFID" pos "LPU64" len %zu\n", - osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)), - *pos, total_length); - GOTO(out, rc = (int)total_length); + orr = lbuf->lb_buf; + orr_le_to_cpu(orr, orr); + rc = orr->orr_size; + *pos = orr->orr_offset; out: - if (req != NULL) - ptlrpc_req_finished(req); + ptlrpc_req_finished(req); - if (update != NULL) - osp_update_request_destroy(update); +out_update: + osp_update_request_destroy(update); - return rc; + RETURN(rc); } /* These body operation will be used to write symlinks during migration etc */