X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ftarget%2Ftgt_handler.c;h=06180874da1d806d6c490406811c5885316d44a6;hp=e3e63dbc3b394eb5d079d2ae2364a0014db1b08e;hb=2b294992edce5af7b79d4300ed3aa1ea6a8db850;hpb=f0c1b06f2418d2016e23eb6e8277be85ca13fc53 diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index e3e63db..0618087 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -21,7 +21,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2011, 2012, Intel Corporation. + * Copyright (c) 2011, 2014, Intel Corporation. */ /* * lustre/target/tgt_handler.c @@ -36,6 +36,10 @@ #include #include +#include +#include +#include +#include #include "tgt_internal.h" @@ -73,29 +77,29 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags) tsi->tsi_mdt_body = body; - if (!(body->valid & OBD_MD_FLID)) + if (!(body->mbo_valid & OBD_MD_FLID)) RETURN(0); /* mdc_pack_body() doesn't check if fid is zero and set OBD_ML_FID * in any case in pre-2.5 clients. Fix that here if needed */ - if (unlikely(fid_is_zero(&body->fid1))) + if (unlikely(fid_is_zero(&body->mbo_fid1))) RETURN(0); - if (!fid_is_sane(&body->fid1)) { + if (!fid_is_sane(&body->mbo_fid1)) { CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt), - PFID(&body->fid1)); + PFID(&body->mbo_fid1)); RETURN(-EINVAL); } obj = lu_object_find(tsi->tsi_env, &tsi->tsi_tgt->lut_bottom->dd_lu_dev, - &body->fid1, NULL); + &body->mbo_fid1, NULL); if (!IS_ERR(obj)) { if ((flags & HABEO_CORPUS) && !lu_object_exists(obj)) { lu_object_put(tsi->tsi_env, obj); /* for capability renew ENOENT will be handled in * mdt_renew_capa */ - if (body->valid & OBD_MD_FLOSSCAPA) + if (body->mbo_valid & OBD_MD_FLOSSCAPA) rc = 0; else rc = -ENOENT; @@ -106,88 +110,245 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags) } else { rc = PTR_ERR(obj); } + + tsi->tsi_fid = body->mbo_fid1; + RETURN(rc); } -static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags) +/** + * Validate oa from client. + * If the request comes from 2.0 clients, currently only RSVD seq and IDIF + * req are valid. + * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0 + * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to + * pack ost_id. Because non-zero oi_seq will make it diffcult to tell + * whether this is oi_fid or real ostid. So it will check + * OBD_CONNECT_FID, then convert the ostid to FID for old client. + * c. Old FID-disable osc will send IDIF. + * d. new FID-enable osc/osp will send normal FID. + * + * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will + * be used for LAST_ID file, and only being accessed inside OST now. + */ +int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa) +{ + struct ost_id *oi = &oa->o_oi; + u64 seq = ostid_seq(oi); + u64 id = ostid_id(oi); + int rc; + ENTRY; + + if (unlikely(!(exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_FID) && + fid_seq_is_echo(seq))) { + /* Sigh 2.[123] client still sends echo req with oi_id = 0 + * during create, and we will reset this to 1, since this + * oi_id is basically useless in the following create process, + * but oi_id == 0 will make it difficult to tell whether it is + * real FID or ost_id. */ + oi->oi_fid.f_seq = FID_SEQ_ECHO; + oi->oi_fid.f_oid = id ?: 1; + oi->oi_fid.f_ver = 0; + } else { + struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); + + if (unlikely((oa->o_valid & OBD_MD_FLID) && id == 0)) + GOTO(out, rc = -EPROTO); + + /* Note: this check might be forced in 2.5 or 2.6, i.e. + * all of the requests are required to setup FLGROUP */ + if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) { + ostid_set_seq_mdt0(oi); + oa->o_valid |= OBD_MD_FLGROUP; + seq = ostid_seq(oi); + } + + if (unlikely(!(fid_seq_is_idif(seq) || fid_seq_is_mdt0(seq) || + fid_seq_is_norm(seq) || fid_seq_is_echo(seq)))) + GOTO(out, rc = -EPROTO); + + rc = ostid_to_fid(&tti->tti_fid1, oi, + tsi->tsi_tgt->lut_lsd.lsd_osd_index); + if (unlikely(rc != 0)) + GOTO(out, rc); + + oi->oi_fid = tti->tti_fid1; + } + + RETURN(0); + +out: + CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n", + tgt_name(tsi->tsi_tgt), obd_export_nid2str(tsi->tsi_exp), + seq, id, rc); + return rc; +} +EXPORT_SYMBOL(tgt_validate_obdo); + +static int tgt_io_data_unpack(struct tgt_session_info *tsi, struct ost_id *oi) { + unsigned max_brw; + struct niobuf_remote *rnb; + struct obd_ioobj *ioo; + int obj_count; + + ENTRY; + + ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ); + if (ioo == NULL) + RETURN(-EPROTO); + + rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE); + if (rnb == NULL) + RETURN(-EPROTO); + + max_brw = ioobj_max_brw_get(ioo); + if (unlikely((max_brw & (max_brw - 1)) != 0)) { + CERROR("%s: client %s sent bad ioobj max %u for "DOSTID + ": rc = %d\n", tgt_name(tsi->tsi_tgt), + obd_export_nid2str(tsi->tsi_exp), max_brw, + POSTID(oi), -EPROTO); + RETURN(-EPROTO); + } + ioo->ioo_oid = *oi; + + obj_count = req_capsule_get_size(tsi->tsi_pill, &RMF_OBD_IOOBJ, + RCL_CLIENT) / sizeof(*ioo); + if (obj_count == 0) { + CERROR("%s: short ioobj\n", tgt_name(tsi->tsi_tgt)); + RETURN(-EPROTO); + } else if (obj_count > 1) { + CERROR("%s: too many ioobjs (%d)\n", tgt_name(tsi->tsi_tgt), + obj_count); + RETURN(-EPROTO); + } + + if (ioo->ioo_bufcnt == 0) { + CERROR("%s: ioo has zero bufcnt\n", tgt_name(tsi->tsi_tgt)); + RETURN(-EPROTO); + } + + if (ioo->ioo_bufcnt > PTLRPC_MAX_BRW_PAGES) { + DEBUG_REQ(D_RPCTRACE, tgt_ses_req(tsi), + "bulk has too many pages (%d)", + ioo->ioo_bufcnt); + RETURN(-EPROTO); + } + + RETURN(0); +} + +static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags) +{ + struct ost_body *body; struct req_capsule *pill = tsi->tsi_pill; + struct lustre_capa *capa; + struct lu_nodemap *nodemap; int rc; ENTRY; - if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) { - rc = tgt_mdt_body_unpack(tsi, flags); - } else { - rc = 0; + body = req_capsule_client_get(pill, &RMF_OST_BODY); + if (body == NULL) + RETURN(-EFAULT); + + rc = tgt_validate_obdo(tsi, &body->oa); + if (rc) + RETURN(rc); + + nodemap = tsi->tsi_exp->exp_target_data.ted_nodemap; + + body->oa.o_uid = nodemap_map_id(nodemap, NODEMAP_UID, + NODEMAP_CLIENT_TO_FS, + body->oa.o_uid); + body->oa.o_gid = nodemap_map_id(nodemap, NODEMAP_GID, + NODEMAP_CLIENT_TO_FS, + body->oa.o_gid); + + if (body->oa.o_valid & OBD_MD_FLOSSCAPA) { + capa = req_capsule_client_get(pill, &RMF_CAPA1); + if (capa == NULL) { + CERROR("%s: OSSCAPA flag is set without capability\n", + tgt_name(tsi->tsi_tgt)); + RETURN(-EFAULT); + } } - if (flags & HABEO_REFERO) { - /* Pack reply */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, - tsi->tsi_mdt_body->eadatasize); - if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_LOGCOOKIES, - RCL_SERVER, 0); + tsi->tsi_ost_body = body; + tsi->tsi_fid = body->oa.o_oi.oi_fid; - rc = req_capsule_server_pack(pill); + if (req_capsule_has_field(pill, &RMF_OBD_IOOBJ, RCL_CLIENT)) { + rc = tgt_io_data_unpack(tsi, &body->oa.o_oi); + if (rc < 0) + RETURN(rc); } + + if (!(body->oa.o_valid & OBD_MD_FLID)) { + if (flags & HABEO_CORPUS) { + CERROR("%s: OBD_MD_FLID flag is not set in ost_body " + "but OID/FID is mandatory with HABEO_CORPUS\n", + tgt_name(tsi->tsi_tgt)); + RETURN(-EPROTO); + } else { + RETURN(0); + } + } + + ost_fid_build_resid(&tsi->tsi_fid, &tsi->tsi_resid); + + /* + * OST doesn't get object in advance for further use to prevent + * situations with nested object_find which is potential deadlock. + */ + tsi->tsi_corpus = NULL; RETURN(rc); } /* - * Invoke handler for this request opc. Also do necessary preprocessing - * (according to handler ->th_flags), and post-processing (setting of - * ->last_{xid,committed}). + * Do necessary preprocessing according to handler ->th_flags. */ -static int tgt_handle_request0(struct tgt_session_info *tsi, - struct tgt_handler *h, - struct ptlrpc_request *req) +static int tgt_request_preprocess(struct tgt_session_info *tsi, + struct tgt_handler *h, + struct ptlrpc_request *req) { - int serious = 0; - int rc; - __u32 flags; + struct req_capsule *pill = tsi->tsi_pill; + __u32 flags = h->th_flags; + int rc = 0; ENTRY; + if (tsi->tsi_preprocessed) + RETURN(0); + LASSERT(h->th_act != NULL); LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg)); LASSERT(current->journal_info == NULL); - /* - * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try - * to put same checks into handlers like mdt_close(), mdt_reint(), - * etc., without talking to mdt authors first. Checking same thing - * there again is useless and returning 0 error without packing reply - * is buggy! Handlers either pack reply or return error. - * - * We return 0 here and do not send any reply in order to emulate - * network failure. Do not send any reply in case any of NET related - * fail_id has occured. - */ - if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE)) - RETURN(0); - - rc = 0; - flags = h->th_flags; LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->th_fmt != NULL)); if (h->th_fmt != NULL) { - req_capsule_set(tsi->tsi_pill, h->th_fmt); - rc = tgt_unpack_req_pack_rep(tsi, flags); + req_capsule_set(pill, h->th_fmt); + if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) { + rc = tgt_mdt_body_unpack(tsi, flags); + if (rc < 0) + RETURN(rc); + } else if (req_capsule_has_field(pill, &RMF_OST_BODY, + RCL_CLIENT)) { + rc = tgt_ost_body_unpack(tsi, flags); + if (rc < 0) + RETURN(rc); + } } - if (rc == 0 && flags & MUTABOR && - tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY) - rc = -EROFS; + if (flags & MUTABOR && tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY) + RETURN(-EROFS); - if (rc == 0 && flags & HABEO_CLAVIS) { + if (flags & HABEO_CLAVIS) { struct ldlm_request *dlm_req; LASSERT(h->th_fmt != NULL); - dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ); + dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ); if (dlm_req != NULL) { if (unlikely(dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS && @@ -208,6 +369,54 @@ static int tgt_handle_request0(struct tgt_session_info *tsi, rc = -EFAULT; } } + tsi->tsi_preprocessed = 1; + RETURN(rc); +} + +/* + * Invoke handler for this request opc. Also do necessary preprocessing + * (according to handler ->th_flags), and post-processing (setting of + * ->last_{xid,committed}). + */ +static int tgt_handle_request0(struct tgt_session_info *tsi, + struct tgt_handler *h, + struct ptlrpc_request *req) +{ + int serious = 0; + int rc; + + ENTRY; + + /* + * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try + * to put same checks into handlers like mdt_close(), mdt_reint(), + * etc., without talking to mdt authors first. Checking same thing + * there again is useless and returning 0 error without packing reply + * is buggy! Handlers either pack reply or return error. + * + * We return 0 here and do not send any reply in order to emulate + * network failure. Do not send any reply in case any of NET related + * fail_id has occured. + */ + if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE)) + RETURN(0); + + rc = tgt_request_preprocess(tsi, h, req); + /* pack reply if reply format is fixed */ + if (rc == 0 && h->th_flags & HABEO_REFERO) { + /* Pack reply */ + if (req_capsule_has_field(tsi->tsi_pill, &RMF_MDT_MD, + RCL_SERVER)) + req_capsule_set_size(tsi->tsi_pill, &RMF_MDT_MD, + RCL_SERVER, + tsi->tsi_mdt_body->mbo_eadatasize); + if (req_capsule_has_field(tsi->tsi_pill, &RMF_LOGCOOKIES, + RCL_SERVER)) + req_capsule_set_size(tsi->tsi_pill, &RMF_LOGCOOKIES, + RCL_SERVER, 0); + + rc = req_capsule_server_pack(tsi->tsi_pill); + } if (likely(rc == 0)) { /* @@ -242,11 +451,6 @@ static int tgt_handle_request0(struct tgt_session_info *tsi, LASSERT(current->journal_info == NULL); - /* - * If we're DISCONNECTing, the export_data is already freed - * - * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT)) - */ if (likely(rc == 0 && req->rq_export)) target_committed_to_req(req); @@ -268,10 +472,17 @@ static int tgt_filter_recovery_request(struct ptlrpc_request *req, case MDS_SYNC: /* used in unmounting */ case OBD_PING: case MDS_REINT: - case UPDATE_OBJ: + case OUT_UPDATE: case SEQ_QUERY: case FLD_QUERY: + case FLD_READ: case LDLM_ENQUEUE: + case OST_CREATE: + case OST_DESTROY: + case OST_PUNCH: + case OST_SETATTR: + case OST_SYNC: + case OST_WRITE: *process = target_queue_recovery_request(req, obd); RETURN(0); @@ -288,7 +499,7 @@ static int tgt_filter_recovery_request(struct ptlrpc_request *req, * -ve: abort immediately with the given error code; * 0: send reply with error code in req->rq_status; */ -int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id) +static int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id) { ENTRY; @@ -343,13 +554,45 @@ int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id) RETURN(+1); } +/* Initial check for request, it is validation mostly */ +static struct tgt_handler *tgt_handler_find_check(struct ptlrpc_request *req) +{ + struct tgt_handler *h; + struct tgt_opc_slice *s; + struct lu_target *tgt; + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + + ENTRY; + + tgt = class_exp2tgt(req->rq_export); + + for (s = tgt->lut_slice; s->tos_hs != NULL; s++) + if (s->tos_opc_start <= opc && opc < s->tos_opc_end) + break; + + /* opcode was not found in slice */ + if (unlikely(s->tos_hs == NULL)) { + CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), + opc); + RETURN(ERR_PTR(-ENOTSUPP)); + } + + LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end); + h = s->tos_hs + (opc - s->tos_opc_start); + if (unlikely(h->th_opc == 0)) { + CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc); + RETURN(ERR_PTR(-ENOTSUPP)); + } + + RETURN(h); +} + int tgt_request_handle(struct ptlrpc_request *req) { struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env); struct lustre_msg *msg = req->rq_reqmsg; struct tgt_handler *h; - struct tgt_opc_slice *s; struct lu_target *tgt; int request_fail_id = 0; __u32 opc = lustre_msg_get_opc(msg); @@ -374,6 +617,10 @@ int tgt_request_handle(struct ptlrpc_request *req) rc = ptlrpc_error(req); GOTO(out, rc); } + /* recovery-small test 18c asks to drop connect reply */ + if (unlikely(opc == OST_CONNECT && + OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))) + GOTO(out, rc = 0); } if (unlikely(!class_connected_export(req->rq_export))) { @@ -386,18 +633,17 @@ int tgt_request_handle(struct ptlrpc_request *req) tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export); tsi->tsi_exp = req->rq_export; + if (exp_connect_flags(req->rq_export) & OBD_CONNECT_JOBSTATS) + tsi->tsi_jobid = lustre_msg_get_jobid(req->rq_reqmsg); + else + tsi->tsi_jobid = NULL; request_fail_id = tgt->lut_request_fail_id; tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id; - for (s = tgt->lut_slice; s->tos_hs != NULL; s++) - if (s->tos_opc_start <= opc && opc < s->tos_opc_end) - break; - - /* opcode was not found in slice */ - if (unlikely(s->tos_hs == NULL)) { - CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc); - req->rq_status = -ENOTSUPP; + h = tgt_handler_find_check(req); + if (IS_ERR(h)) { + req->rq_status = PTR_ERR(h); rc = ptlrpc_error(req); GOTO(out, rc); } @@ -405,17 +651,6 @@ int tgt_request_handle(struct ptlrpc_request *req) if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE)) GOTO(out, rc = 0); - LASSERT(current->journal_info == NULL); - - LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end); - h = s->tos_hs + (opc - s->tos_opc_start); - if (unlikely(h->th_opc == 0)) { - CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc); - req->rq_status = -ENOTSUPP; - rc = ptlrpc_error(req); - GOTO(out, rc); - } - rc = lustre_msg_check_version(msg, h->th_version); if (unlikely(rc)) { DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version" @@ -437,18 +672,48 @@ int tgt_request_handle(struct ptlrpc_request *req) EXIT; out: req_capsule_fini(tsi->tsi_pill); - tsi->tsi_pill = NULL; if (tsi->tsi_corpus != NULL) { lu_object_put(tsi->tsi_env, tsi->tsi_corpus); tsi->tsi_corpus = NULL; } - tsi->tsi_env = NULL; - tsi->tsi_mdt_body = NULL; - tsi->tsi_dlm_req = NULL; return rc; } EXPORT_SYMBOL(tgt_request_handle); +/** Assign high priority operations to the request if needed. */ +int tgt_hpreq_handler(struct ptlrpc_request *req) +{ + struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env); + struct tgt_handler *h; + int rc; + + ENTRY; + + if (req->rq_export == NULL) + RETURN(0); + + req_capsule_init(&req->rq_pill, req, RCL_SERVER); + tsi->tsi_pill = &req->rq_pill; + tsi->tsi_env = req->rq_svc_thread->t_env; + tsi->tsi_tgt = class_exp2tgt(req->rq_export); + tsi->tsi_exp = req->rq_export; + + h = tgt_handler_find_check(req); + if (IS_ERR(h)) { + rc = PTR_ERR(h); + RETURN(rc); + } + + rc = tgt_request_preprocess(tsi, h, req); + if (unlikely(rc != 0)) + RETURN(rc); + + if (h->th_hp != NULL) + h->th_hp(tsi); + RETURN(0); +} +EXPORT_SYMBOL(tgt_hpreq_handler); + void tgt_counter_incr(struct obd_export *exp, int opcode) { lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode); @@ -544,7 +809,7 @@ static int tgt_init_sec_level(struct ptlrpc_request *req) RETURN(-EACCES); } } else { - if (req->rq_auth_uid == INVALID_UID) { + if (!uid_valid(make_kuid(&init_user_ns, req->rq_auth_uid))) { CDEBUG(D_SEC, "client %s -> target %s: user is not " "authenticated!\n", client, tgt_name(tgt)); RETURN(-EACCES); @@ -637,6 +902,30 @@ int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp return rc; } +int tgt_adapt_sptlrpc_conf(struct lu_target *tgt, int initial) +{ + struct sptlrpc_rule_set tmp_rset; + int rc; + + sptlrpc_rule_set_init(&tmp_rset); + rc = sptlrpc_conf_target_get_rules(tgt->lut_obd, &tmp_rset, initial); + if (rc) { + CERROR("%s: failed get sptlrpc rules: rc = %d\n", + tgt_name(tgt), rc); + return rc; + } + + sptlrpc_target_update_exp_flavor(tgt->lut_obd, &tmp_rset); + + write_lock(&tgt->lut_sptlrpc_lock); + sptlrpc_rule_set_free(&tgt->lut_sptlrpc_rset); + tgt->lut_sptlrpc_rset = tmp_rset; + write_unlock(&tgt->lut_sptlrpc_lock); + + return 0; +} +EXPORT_SYMBOL(tgt_adapt_sptlrpc_conf); + int tgt_connect(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); @@ -757,7 +1046,7 @@ EXPORT_SYMBOL(tgt_sendpage); /* * OBD_IDX_READ handler */ -int tgt_obd_idx_read(struct tgt_session_info *tsi) +static int tgt_obd_idx_read(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); struct lu_rdpg *rdpg = &tti->tti_u.rdpg.tti_rdpg; @@ -848,12 +1137,90 @@ TGT_OBD_HDL (0, OBD_IDX_READ, tgt_obd_idx_read) }; EXPORT_SYMBOL(tgt_obd_handlers); +int tgt_sync(const struct lu_env *env, struct lu_target *tgt, + struct dt_object *obj, __u64 start, __u64 end) +{ + int rc = 0; + + ENTRY; + + /* if no objid is specified, it means "sync whole filesystem" */ + if (obj == NULL) { + rc = dt_sync(env, tgt->lut_bottom); + } else if (dt_version_get(env, obj) > + tgt->lut_obd->obd_last_committed) { + rc = dt_object_sync(env, obj, start, end); + } + + RETURN(rc); +} +EXPORT_SYMBOL(tgt_sync); /* * Unified target DLM handlers. */ -struct ldlm_callback_suite tgt_dlm_cbs = { + +/* Ensure that data and metadata are synced to the disk when lock is cancelled + * (if requested) */ +static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) +{ + struct lu_env env; + struct lu_target *tgt; + struct dt_object *obj; + struct lu_fid fid; + int rc = 0; + + ENTRY; + + tgt = class_exp2tgt(lock->l_export); + + if (flag == LDLM_CB_CANCELING && + (lock->l_granted_mode & (LCK_PW | LCK_GROUP)) && + (tgt->lut_sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL || + (tgt->lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL && + lock->l_flags & LDLM_FL_CBPENDING))) { + __u64 start = 0; + __u64 end = OBD_OBJECT_EOF; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (unlikely(rc != 0)) + RETURN(rc); + + ost_fid_from_resid(&fid, &lock->l_resource->lr_name, + tgt->lut_lsd.lsd_osd_index); + obj = dt_locate(&env, tgt->lut_bottom, &fid); + if (IS_ERR(obj)) + GOTO(err_env, rc = PTR_ERR(obj)); + + if (!dt_object_exists(obj)) + GOTO(err_put, rc = -ENOENT); + + if (lock->l_resource->lr_type == LDLM_EXTENT) { + start = lock->l_policy_data.l_extent.start; + end = lock->l_policy_data.l_extent.end; + } + + rc = tgt_sync(&env, tgt, obj, start, end); + if (rc < 0) { + CERROR("%s: syncing "DFID" ("LPU64"-"LPU64") on lock " + "cancel: rc = %d\n", + tgt_name(tgt), PFID(&fid), + lock->l_policy_data.l_extent.start, + lock->l_policy_data.l_extent.end, rc); + } +err_put: + lu_object_put(&env, &obj->do_lu); +err_env: + lu_env_fini(&env); + } + + rc = ldlm_server_blocking_ast(lock, desc, data, flag); + RETURN(rc); +} + +static struct ldlm_callback_suite tgt_dlm_cbs = { .lcs_completion = ldlm_server_completion_ast, - .lcs_blocking = ldlm_server_blocking_ast, + .lcs_blocking = tgt_blocking_ast, .lcs_glimpse = ldlm_server_glimpse_ast }; @@ -1004,7 +1371,7 @@ EXPORT_SYMBOL(tgt_llog_handlers); * sec context handlers */ /* XXX: Implement based on mdt_sec_ctx_handle()? */ -int tgt_sec_ctx_handle(struct tgt_session_info *tsi) +static int tgt_sec_ctx_handle(struct tgt_session_info *tsi) { return 0; } @@ -1015,3 +1382,742 @@ TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT, tgt_sec_ctx_handle), TGT_SEC_HDL_VAR(0, SEC_CTX_FINI, tgt_sec_ctx_handle), }; EXPORT_SYMBOL(tgt_sec_ctx_handlers); + +int (*tgt_lfsck_in_notify)(const struct lu_env *env, + struct dt_device *key, + struct lfsck_request *lr, + struct thandle *th) = NULL; + +void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *, + struct dt_device *, + struct lfsck_request *, + struct thandle *)) +{ + tgt_lfsck_in_notify = notify; +} +EXPORT_SYMBOL(tgt_register_lfsck_in_notify); + +static int (*tgt_lfsck_query)(const struct lu_env *env, + struct dt_device *key, + struct lfsck_request *lr) = NULL; + +void tgt_register_lfsck_query(int (*query)(const struct lu_env *, + struct dt_device *, + struct lfsck_request *)) +{ + tgt_lfsck_query = query; +} +EXPORT_SYMBOL(tgt_register_lfsck_query); + +/* LFSCK request handlers */ +static int tgt_handle_lfsck_notify(struct tgt_session_info *tsi) +{ + const struct lu_env *env = tsi->tsi_env; + struct dt_device *key = tsi->tsi_tgt->lut_bottom; + struct lfsck_request *lr; + int rc; + ENTRY; + + lr = req_capsule_client_get(tsi->tsi_pill, &RMF_LFSCK_REQUEST); + if (lr == NULL) + RETURN(-EPROTO); + + rc = tgt_lfsck_in_notify(env, key, lr, NULL); + + RETURN(rc); +} + +static int tgt_handle_lfsck_query(struct tgt_session_info *tsi) +{ + struct lfsck_request *request; + struct lfsck_reply *reply; + int rc; + ENTRY; + + request = req_capsule_client_get(tsi->tsi_pill, &RMF_LFSCK_REQUEST); + if (request == NULL) + RETURN(-EPROTO); + + reply = req_capsule_server_get(tsi->tsi_pill, &RMF_LFSCK_REPLY); + if (reply == NULL) + RETURN(-ENOMEM); + + rc = tgt_lfsck_query(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, request); + reply->lr_status = rc; + + RETURN(rc < 0 ? rc : 0); +} + +struct tgt_handler tgt_lfsck_handlers[] = { +TGT_LFSCK_HDL(HABEO_REFERO, LFSCK_NOTIFY, tgt_handle_lfsck_notify), +TGT_LFSCK_HDL(HABEO_REFERO, LFSCK_QUERY, tgt_handle_lfsck_query), +}; +EXPORT_SYMBOL(tgt_lfsck_handlers); + +/* + * initialize per-thread page pool (bug 5137). + */ +int tgt_io_thread_init(struct ptlrpc_thread *thread) +{ + struct tgt_thread_big_cache *tbc; + + ENTRY; + + LASSERT(thread != NULL); + LASSERT(thread->t_data == NULL); + + OBD_ALLOC_LARGE(tbc, sizeof(*tbc)); + if (tbc == NULL) + RETURN(-ENOMEM); + thread->t_data = tbc; + RETURN(0); +} +EXPORT_SYMBOL(tgt_io_thread_init); + +/* + * free per-thread pool created by tgt_thread_init(). + */ +void tgt_io_thread_done(struct ptlrpc_thread *thread) +{ + struct tgt_thread_big_cache *tbc; + + ENTRY; + + LASSERT(thread != NULL); + + /* + * be prepared to handle partially-initialized pools (because this is + * called from ost_io_thread_init() for cleanup. + */ + tbc = thread->t_data; + if (tbc != NULL) { + OBD_FREE_LARGE(tbc, sizeof(*tbc)); + thread->t_data = NULL; + } + EXIT; +} +EXPORT_SYMBOL(tgt_io_thread_done); +/** + * Helper function for getting server side [start, start+count] DLM lock + * if asked by client. + */ +int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, + __u64 start, __u64 end, struct lustre_handle *lh, + int mode, __u64 *flags) +{ + ldlm_policy_data_t policy; + int rc; + + ENTRY; + + LASSERT(lh != NULL); + LASSERT(ns != NULL); + LASSERT(!lustre_handle_is_used(lh)); + + policy.l_extent.gid = 0; + policy.l_extent.start = start & CFS_PAGE_MASK; + + /* + * If ->o_blocks is EOF it means "lock till the end of the file". + * Otherwise, it's size of an extent or hole being punched (in bytes). + */ + if (end == OBD_OBJECT_EOF || end < start) + policy.l_extent.end = OBD_OBJECT_EOF; + else + policy.l_extent.end = end | ~CFS_PAGE_MASK; + + rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_EXTENT, &policy, mode, + flags, ldlm_blocking_ast, + ldlm_completion_ast, ldlm_glimpse_ast, + NULL, 0, LVB_T_NONE, NULL, lh); + RETURN(rc == ELDLM_OK ? 0 : -EIO); +} +EXPORT_SYMBOL(tgt_extent_lock); + +void tgt_extent_unlock(struct lustre_handle *lh, ldlm_mode_t mode) +{ + LASSERT(lustre_handle_is_used(lh)); + ldlm_lock_decref(lh, mode); +} +EXPORT_SYMBOL(tgt_extent_unlock); + +int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, + struct obd_ioobj *obj, struct niobuf_remote *nb, + struct lustre_handle *lh, int mode) +{ + __u64 flags = 0; + int nrbufs = obj->ioo_bufcnt; + int i; + + ENTRY; + + LASSERT(mode == LCK_PR || mode == LCK_PW); + LASSERT(!lustre_handle_is_used(lh)); + + if (nrbufs == 0 || !(nb[0].rnb_flags & OBD_BRW_SRVLOCK)) + RETURN(0); + + for (i = 1; i < nrbufs; i++) + if (!(nb[i].rnb_flags & OBD_BRW_SRVLOCK)) + RETURN(-EFAULT); + + RETURN(tgt_extent_lock(ns, res_id, nb[0].rnb_offset, + nb[nrbufs - 1].rnb_offset + + nb[nrbufs - 1].rnb_len - 1, + lh, mode, &flags)); +} +EXPORT_SYMBOL(tgt_brw_lock); + +void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob, + struct lustre_handle *lh, int mode) +{ + ENTRY; + + LASSERT(mode == LCK_PR || mode == LCK_PW); + LASSERT((obj->ioo_bufcnt > 0 && + (niob[0].rnb_flags & OBD_BRW_SRVLOCK)) == + lustre_handle_is_used(lh)); + + if (lustre_handle_is_used(lh)) + tgt_extent_unlock(lh, mode); + EXIT; +} +EXPORT_SYMBOL(tgt_brw_unlock); + +static __u32 tgt_checksum_bulk(struct lu_target *tgt, + struct ptlrpc_bulk_desc *desc, int opc, + cksum_type_t cksum_type) +{ + struct cfs_crypto_hash_desc *hdesc; + unsigned int bufsize; + int i, err; + unsigned char cfs_alg = cksum_obd2cfs(cksum_type); + __u32 cksum; + + hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0); + if (IS_ERR(hdesc)) { + CERROR("%s: unable to initialize checksum hash %s\n", + tgt_name(tgt), cfs_crypto_hash_name(cfs_alg)); + return PTR_ERR(hdesc); + } + + CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg)); + for (i = 0; i < desc->bd_iov_count; i++) { + /* corrupt the data before we compute the checksum, to + * simulate a client->OST data error */ + if (i == 0 && opc == OST_WRITE && + OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) { + int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK; + int len = desc->bd_iov[i].kiov_len; + struct page *np = tgt_page_to_corrupt; + char *ptr = kmap(desc->bd_iov[i].kiov_page) + off; + + if (np) { + char *ptr2 = kmap(np) + off; + + memcpy(ptr2, ptr, len); + memcpy(ptr2, "bad3", min(4, len)); + kunmap(np); + desc->bd_iov[i].kiov_page = np; + } else { + CERROR("%s: can't alloc page for corruption\n", + tgt_name(tgt)); + } + } + cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page, + desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK, + desc->bd_iov[i].kiov_len); + + /* corrupt the data after we compute the checksum, to + * simulate an OST->client data error */ + if (i == 0 && opc == OST_READ && + OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) { + int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK; + int len = desc->bd_iov[i].kiov_len; + struct page *np = tgt_page_to_corrupt; + char *ptr = kmap(desc->bd_iov[i].kiov_page) + off; + + if (np) { + char *ptr2 = kmap(np) + off; + + memcpy(ptr2, ptr, len); + memcpy(ptr2, "bad4", min(4, len)); + kunmap(np); + desc->bd_iov[i].kiov_page = np; + } else { + CERROR("%s: can't alloc page for corruption\n", + tgt_name(tgt)); + } + } + } + + bufsize = sizeof(cksum); + err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize); + + return cksum; +} + +int tgt_brw_read(struct tgt_session_info *tsi) +{ + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct ptlrpc_bulk_desc *desc = NULL; + struct obd_export *exp = tsi->tsi_exp; + struct niobuf_remote *remote_nb; + struct niobuf_local *local_nb; + struct obd_ioobj *ioo; + struct ost_body *body, *repbody; + struct l_wait_info lwi; + struct lustre_handle lockh = { 0 }; + int npages, nob = 0, rc, i, no_reply = 0; + struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data; + + ENTRY; + + if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) { + CERROR("%s: deny read request from %s to portal %u\n", + tgt_name(tsi->tsi_tgt), + obd_export_nid2str(req->rq_export), + ptlrpc_req2svc(req)->srv_req_portal); + RETURN(-EPROTO); + } + + req->rq_bulk_read = 1; + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK)) + RETURN(-EIO); + + OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ? + cfs_fail_val : (obd_timeout + 1) / 4); + + /* Check if there is eviction in progress, and if so, wait for it to + * finish */ + if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) { + /* We do not care how long it takes */ + lwi = LWI_INTR(NULL, NULL); + rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq, + !atomic_read(&exp->exp_obd->obd_evict_inprogress), + &lwi); + } + + /* There must be big cache in current thread to process this request + * if it is NULL then something went wrong and it wasn't allocated, + * report -ENOMEM in that case */ + if (tbc == NULL) + RETURN(-ENOMEM); + + body = tsi->tsi_ost_body; + LASSERT(body != NULL); + + ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ); + LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */ + + remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); + LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */ + + local_nb = tbc->local; + + rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo, + remote_nb, &lockh, LCK_PR); + if (rc != 0) + RETURN(rc); + + /* + * If getting the lock took more time than + * client was willing to wait, drop it. b=11330 + */ + if (cfs_time_current_sec() > req->rq_deadline || + OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) { + no_reply = 1; + CERROR("Dropping timed-out read from %s because locking" + "object "DOSTID" took %ld seconds (limit was %ld).\n", + libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid), + cfs_time_current_sec() - req->rq_arrival_time.tv_sec, + req->rq_deadline - req->rq_arrival_time.tv_sec); + GOTO(out_lock, rc = -ETIMEDOUT); + } + + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + repbody->oa = body->oa; + + npages = PTLRPC_MAX_BRW_PAGES; + rc = obd_preprw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1, + ioo, remote_nb, &npages, local_nb, NULL, BYPASS_CAPA); + if (rc != 0) + GOTO(out_lock, rc); + + desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo), + BULK_PUT_SOURCE, OST_BULK_PORTAL); + if (desc == NULL) + GOTO(out_commitrw, rc = -ENOMEM); + + nob = 0; + for (i = 0; i < npages; i++) { + int page_rc = local_nb[i].lnb_rc; + + if (page_rc < 0) { + rc = page_rc; + break; + } + + nob += page_rc; + if (page_rc != 0) { /* some data! */ + LASSERT(local_nb[i].lnb_page != NULL); + ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].lnb_page, + local_nb[i].lnb_page_offset, + page_rc); + } + + if (page_rc != local_nb[i].lnb_len) { /* short read */ + /* All subsequent pages should be 0 */ + while (++i < npages) + LASSERT(local_nb[i].lnb_rc == 0); + break; + } + } + + if (body->oa.o_valid & OBD_MD_FLCKSUM) { + cksum_type_t cksum_type = + cksum_type_unpack(body->oa.o_valid & OBD_MD_FLFLAGS ? + body->oa.o_flags : 0); + repbody->oa.o_flags = cksum_type_pack(cksum_type); + repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc, + OST_READ, cksum_type); + CDEBUG(D_PAGE, "checksum at read origin: %x\n", + repbody->oa.o_cksum); + } else { + repbody->oa.o_valid = 0; + } + /* We're finishing using body->oa as an input variable */ + + /* Check if client was evicted while we were doing i/o before touching + * network */ + if (likely(rc == 0 && + !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) { + rc = target_bulk_io(exp, desc, &lwi); + no_reply = rc != 0; + } + +out_commitrw: + /* Must commit after prep above in all cases */ + rc = obd_commitrw(tsi->tsi_env, OBD_BRW_READ, exp, + &repbody->oa, 1, ioo, remote_nb, npages, local_nb, + NULL, rc); + if (rc == 0) + tgt_drop_id(exp, &repbody->oa); +out_lock: + tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PR); + + if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)) + ptlrpc_free_bulk_nopin(desc); + + LASSERT(rc <= 0); + if (rc == 0) { + rc = nob; + ptlrpc_lprocfs_brw(req, nob); + } else if (no_reply) { + req->rq_no_reply = 1; + /* reply out callback would free */ + ptlrpc_req_drop_rs(req); + LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), " + "client will retry: rc %d\n", + exp->exp_obd->obd_name, + obd_uuid2str(&exp->exp_client_uuid), + obd_export_nid2str(exp), rc); + } + /* send a bulk after reply to simulate a network delay or reordering + * by a router */ + if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) { + wait_queue_head_t waitq; + struct l_wait_info lwi1; + + CDEBUG(D_INFO, "reorder BULK\n"); + init_waitqueue_head(&waitq); + + lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL); + l_wait_event(waitq, 0, &lwi1); + target_bulk_io(exp, desc, &lwi); + ptlrpc_free_bulk_nopin(desc); + } + + RETURN(rc); +} +EXPORT_SYMBOL(tgt_brw_read); + +static void tgt_warn_on_cksum(struct ptlrpc_request *req, + struct ptlrpc_bulk_desc *desc, + struct niobuf_local *local_nb, int npages, + u32 client_cksum, u32 server_cksum, + bool mmap) +{ + struct obd_export *exp = req->rq_export; + struct ost_body *body; + char *router; + char *via; + + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body != NULL); + + if (req->rq_peer.nid == desc->bd_sender) { + via = router = ""; + } else { + via = " via "; + router = libcfs_nid2str(desc->bd_sender); + } + + if (mmap) { + CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n", + client_cksum, server_cksum); + return; + } + + LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode " + DFID" object "DOSTID" extent ["LPU64"-"LPU64 + "]: client csum %x, server csum %x\n", + exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer), + via, router, + body->oa.o_valid & OBD_MD_FLFID ? + body->oa.o_parent_seq : (__u64)0, + body->oa.o_valid & OBD_MD_FLFID ? + body->oa.o_parent_oid : 0, + body->oa.o_valid & OBD_MD_FLFID ? + body->oa.o_parent_ver : 0, + POSTID(&body->oa.o_oi), + local_nb[0].lnb_file_offset, + local_nb[npages-1].lnb_file_offset + + local_nb[npages - 1].lnb_len - 1, + client_cksum, server_cksum); +} + +int tgt_brw_write(struct tgt_session_info *tsi) +{ + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct ptlrpc_bulk_desc *desc = NULL; + struct obd_export *exp = req->rq_export; + struct niobuf_remote *remote_nb; + struct niobuf_local *local_nb; + struct obd_ioobj *ioo; + struct ost_body *body, *repbody; + struct l_wait_info lwi; + struct lustre_handle lockh = {0}; + __u32 *rcs; + int objcount, niocount, npages; + int rc, i, j; + cksum_type_t cksum_type = OBD_CKSUM_CRC32; + bool no_reply = false, mmap; + struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data; + + ENTRY; + + if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) { + CERROR("%s: deny write request from %s to portal %u\n", + tgt_name(tsi->tsi_tgt), + obd_export_nid2str(req->rq_export), + ptlrpc_req2svc(req)->srv_req_portal); + RETURN(err_serious(-EPROTO)); + } + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) + RETURN(err_serious(-ENOSPC)); + if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1)) + RETURN(err_serious(-EROFS)); + + req->rq_bulk_write = 1; + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK)) + RETURN(err_serious(-EIO)); + if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2)) + RETURN(err_serious(-EFAULT)); + + /* pause before transaction has been started */ + CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ? + cfs_fail_val : (obd_timeout + 1) / 4); + + /* There must be big cache in current thread to process this request + * if it is NULL then something went wrong and it wasn't allocated, + * report -ENOMEM in that case */ + if (tbc == NULL) + RETURN(-ENOMEM); + + body = tsi->tsi_ost_body; + LASSERT(body != NULL); + + ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ); + LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */ + + objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ, + RCL_CLIENT) / sizeof(*ioo); + + for (niocount = i = 0; i < objcount; i++) + niocount += ioo[i].ioo_bufcnt; + + remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); + LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */ + if (niocount != req_capsule_get_size(&req->rq_pill, + &RMF_NIOBUF_REMOTE, RCL_CLIENT) / + sizeof(*remote_nb)) + RETURN(err_serious(-EPROTO)); + + if ((remote_nb[0].rnb_flags & OBD_BRW_MEMALLOC) && + (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self)) + memory_pressure_set(); + + req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER, + niocount * sizeof(*rcs)); + rc = req_capsule_server_pack(&req->rq_pill); + if (rc != 0) + GOTO(out, rc = err_serious(rc)); + + CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val); + rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS); + + local_nb = tbc->local; + + rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo, + remote_nb, &lockh, LCK_PW); + if (rc != 0) + GOTO(out, rc); + + /* + * If getting the lock took more time than + * client was willing to wait, drop it. b=11330 + */ + if (cfs_time_current_sec() > req->rq_deadline || + OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) { + no_reply = true; + CERROR("%s: Dropping timed-out write from %s because locking " + "object "DOSTID" took %ld seconds (limit was %ld).\n", + tgt_name(tsi->tsi_tgt), libcfs_id2str(req->rq_peer), + POSTID(&ioo->ioo_oid), + cfs_time_current_sec() - req->rq_arrival_time.tv_sec, + req->rq_deadline - req->rq_arrival_time.tv_sec); + GOTO(out_lock, rc = -ETIMEDOUT); + } + + /* Because we already sync grant info with client when reconnect, + * grant info will be cleared for resent req, then fed_grant and + * total_grant will not be modified in following preprw_write */ + if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) { + DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info"); + body->oa.o_valid &= ~OBD_MD_FLGRANT; + } + + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (repbody == NULL) + GOTO(out_lock, rc = -ENOMEM); + repbody->oa = body->oa; + + npages = PTLRPC_MAX_BRW_PAGES; + rc = obd_preprw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa, + objcount, ioo, remote_nb, &npages, local_nb, NULL, + BYPASS_CAPA); + if (rc < 0) + GOTO(out_lock, rc); + + desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo), + BULK_GET_SINK, OST_BULK_PORTAL); + if (desc == NULL) + GOTO(skip_transfer, rc = -ENOMEM); + + /* NB Having prepped, we must commit... */ + for (i = 0; i < npages; i++) + ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].lnb_page, + local_nb[i].lnb_page_offset, + local_nb[i].lnb_len); + + rc = sptlrpc_svc_prep_bulk(req, desc); + if (rc != 0) + GOTO(skip_transfer, rc); + + rc = target_bulk_io(exp, desc, &lwi); + no_reply = rc != 0; + +skip_transfer: + if (body->oa.o_valid & OBD_MD_FLCKSUM && rc == 0) { + static int cksum_counter; + + if (body->oa.o_valid & OBD_MD_FLFLAGS) + cksum_type = cksum_type_unpack(body->oa.o_flags); + + repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL; + repbody->oa.o_flags |= cksum_type_pack(cksum_type); + repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc, + OST_WRITE, cksum_type); + cksum_counter++; + + if (unlikely(body->oa.o_cksum != repbody->oa.o_cksum)) { + mmap = (body->oa.o_valid & OBD_MD_FLFLAGS && + body->oa.o_flags & OBD_FL_MMAP); + + tgt_warn_on_cksum(req, desc, local_nb, npages, + body->oa.o_cksum, + repbody->oa.o_cksum, mmap); + cksum_counter = 0; + } else if ((cksum_counter & (-cksum_counter)) == + cksum_counter) { + CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n", + cksum_counter, libcfs_id2str(req->rq_peer), + repbody->oa.o_cksum); + } + } + + /* Must commit after prep above in all cases */ + rc = obd_commitrw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa, + objcount, ioo, remote_nb, npages, local_nb, NULL, + rc); + if (rc == -ENOTCONN) + /* quota acquire process has been given up because + * either the client has been evicted or the client + * has timed out the request already */ + no_reply = true; + + /* + * Disable sending mtime back to the client. If the client locked the + * whole object, then it has already updated the mtime on its side, + * otherwise it will have to glimpse anyway (see bug 21489, comment 32) + */ + repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME); + + if (rc == 0) { + int nob = 0; + + /* set per-requested niobuf return codes */ + for (i = j = 0; i < niocount; i++) { + int len = remote_nb[i].rnb_len; + + nob += len; + rcs[i] = 0; + do { + LASSERT(j < npages); + if (local_nb[j].lnb_rc < 0) + rcs[i] = local_nb[j].lnb_rc; + len -= local_nb[j].lnb_len; + j++; + } while (len > 0); + LASSERT(len == 0); + } + LASSERT(j == npages); + ptlrpc_lprocfs_brw(req, nob); + + tgt_drop_id(exp, &repbody->oa); + } +out_lock: + tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PW); + if (desc) + ptlrpc_free_bulk_nopin(desc); +out: + if (no_reply) { + req->rq_no_reply = 1; + /* reply out callback would free */ + ptlrpc_req_drop_rs(req); + LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), " + "client will retry: rc %d\n", + exp->exp_obd->obd_name, + obd_uuid2str(&exp->exp_client_uuid), + obd_export_nid2str(exp), rc); + } + memory_pressure_clr(); + RETURN(rc); +} +EXPORT_SYMBOL(tgt_brw_write);