-/*
- * this page is allocated statically when module is initializing
- * it is used to simulate data corruptions, see ost_checksum_bulk()
- * for details. as the original pages provided by the layers below
- * can be remain in the internal cache, we do not want to modify
- * them.
- */
-static struct page *ost_page_to_corrupt = NULL;
-
-/**
- * Do not return server-side uid/gid to remote client
- */
-static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
-{
- if (exp_connect_rmtclient(exp)) {
- oa->o_uid = -1;
- oa->o_gid = -1;
- oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
- }
-}
-
-/**
- * Validate oa from client.
- * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
- * req are valid.
- * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0
- * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
- * pack ost_id. Because non-zero oi_seq will make it diffcult to tell
- * whether this is oi_fid or real ostid. So it will check
- * OBD_CONNECT_FID, then convert the ostid to FID for old client.
- * c. Old FID-disable osc will send IDIF.
- * d. new FID-enable osc/osp will send normal FID.
- *
- * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
- * be used for LAST_ID file, and only being accessed inside OST now.
- */
-static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
- struct obd_ioobj *ioobj)
-{
- int rc = 0;
-
- if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
- fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
- /* Sigh 2.[123] client still sends echo req with oi_id = 0
- * during create, and we will reset this to 1, since this
- * oi_id is basically useless in the following create process,
- * but oi_id == 0 will make it difficult to tell whether it is
- * real FID or ost_id. */
- oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
- oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
- oa->o_oi.oi_fid.f_ver = 0;
- } else {
- if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
- GOTO(out, rc = -EPROTO);
-
- /* Note: this check might be forced in 2.5 or 2.6, i.e.
- * all of the requests are required to setup FLGROUP */
- if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
- ostid_set_seq_mdt0(&oa->o_oi);
- if (ioobj)
- ostid_set_seq_mdt0(&ioobj->ioo_oid);
- oa->o_valid |= OBD_MD_FLGROUP;
- }
-
- if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
- GOTO(out, rc = -EPROTO);
- }
-
- if (ioobj != NULL) {
- unsigned max_brw = ioobj_max_brw_get(ioobj);
-
- if (unlikely((max_brw & (max_brw - 1)) != 0)) {
- CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
- ": rc = -EPROTO\n", exp->exp_obd->obd_name,
- obd_export_nid2str(exp), max_brw,
- POSTID(&oa->o_oi));
- GOTO(out, rc = -EPROTO);
- }
- ioobj->ioo_oid = oa->o_oi;
- }
-
-out:
- if (rc != 0)
- CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
- exp->exp_obd->obd_name, obd_export_nid2str(exp),
- oa ? ostid_seq(&oa->o_oi) : -1,
- oa ? ostid_id(&oa->o_oi) : -1, rc);
- return rc;
-}
-
-void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
-{
- struct oti_req_ack_lock *ack_lock;
- int i;
-
- if (oti == NULL)
- return;
-
- if (req->rq_repmsg) {
- __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
- lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
- versions[0] = oti->oti_pre_version;
- lustre_msg_set_versions(req->rq_repmsg, versions);
- }
- req->rq_transno = oti->oti_transno;
-
- /* XXX 4 == entries in oti_ack_locks??? */
- for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
- if (!ack_lock->mode)
- break;
- /* XXX not even calling target_send_reply in some cases... */
- ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
- }
-}
-
-static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- struct lustre_capa *capa = NULL;
- int rc;
- ENTRY;
-
- /* Get the request body */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- if (ostid_id(&body->oa.o_oi) == 0)
- RETURN(-EPROTO);
-
- rc = ost_validate_obdo(exp, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- /* If there's a DLM request, cancel the locks mentioned in it*/
- if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
- struct ldlm_request *dlm;
-
- dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm == NULL)
- RETURN (-EFAULT);
- ldlm_request_cancel(req, dlm, 0);
- }
-
- /* If there's a capability, get it */
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST DESTROY");
- RETURN (-EFAULT);
- }
- }
-
- /* Prepare the reply */
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- /* Get the log cancellation cookie */
- if (body->oa.o_valid & OBD_MD_FLCOOKIE)
- oti->oti_logcookies = &body->oa.o_lcookie;
-
- /* Finish the reply */
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-
- /* Do the destroy and set the reply status accordingly */
- req->rq_status = obd_destroy(req->rq_svc_thread->t_env, exp,
- &repbody->oa, NULL, oti, NULL, capa);
- RETURN(0);
-}
-
-/**
- * Helper function for getting server side [start, start+count] DLM lock
- * if asked by client.
- */
-static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
- __u64 start, __u64 count, struct lustre_handle *lh,
- int mode, __u64 flags)
-{
- struct ldlm_res_id res_id;
- ldlm_policy_data_t policy;
- __u64 end = start + count;
-
- ENTRY;
-
- LASSERT(!lustre_handle_is_used(lh));
- /* o_id and o_gr are used for localizing resource, if client miss to set
- * them, do not trigger ASSERTION. */
- if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
- (OBD_MD_FLID | OBD_MD_FLGROUP)))
- RETURN(-EPROTO);
-
- if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
- !(oa->o_flags & OBD_FL_SRVLOCK))
- RETURN(0);
-
- if (mode == LCK_MINMODE)
- RETURN(0);
-
- ostid_build_res_name(&oa->o_oi, &res_id);
- CDEBUG(D_INODE, "OST-side extent lock.\n");
-
- policy.l_extent.start = start & CFS_PAGE_MASK;
-
- /* If ->o_blocks is EOF it means "lock till the end of the
- * file". Otherwise, it's size of a hole being punched (in bytes) */
- if (count == OBD_OBJECT_EOF || end < start)
- policy.l_extent.end = OBD_OBJECT_EOF;
- else
- policy.l_extent.end = end | ~CFS_PAGE_MASK;
-
- RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
- LDLM_EXTENT, &policy, mode, &flags,
- ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
- NULL, lh));
-}
-
-/* Helper function: release lock, if any. */
-static void ost_lock_put(struct obd_export *exp,
- struct lustre_handle *lh, int mode)
-{
- ENTRY;
- if (lustre_handle_is_used(lh))
- ldlm_lock_decref(lh, mode);
- EXIT;
-}
-
-static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
-{
- struct ost_body *body, *repbody;
- struct obd_info *oinfo;
- struct lustre_handle lh = { 0 };
- struct lustre_capa *capa = NULL;
- ldlm_mode_t lock_mode;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST GETATTR");
- RETURN(-EFAULT);
- }
- }
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
-
- lock_mode = LCK_MINMODE;
- if (body->oa.o_valid & OBD_MD_FLFLAGS &&
- body->oa.o_flags & OBD_FL_SRVLOCK) {
- lock_mode = LCK_PR;
- if (body->oa.o_flags & OBD_FL_FLUSH)
- lock_mode = LCK_PW;
- }
- rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh,
- lock_mode, 0);
- if (rc)
- RETURN(rc);
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- GOTO(unlock, rc = -ENOMEM);
- oinfo->oi_oa = &repbody->oa;
- oinfo->oi_capa = capa;
-
- req->rq_status = obd_getattr(req->rq_svc_thread->t_env, exp, oinfo);
-
- OBD_FREE_PTR(oinfo);
-
- ost_drop_id(exp, &repbody->oa);
-
- if (!(repbody->oa.o_valid & OBD_MD_FLFLAGS)) {
- repbody->oa.o_valid |= OBD_MD_FLFLAGS;
- repbody->oa.o_flags = 0;
- }
- repbody->oa.o_flags |= OBD_FL_FLUSH;
-
-unlock:
- ost_lock_put(exp, &lh, lock_mode);
- RETURN(rc);
-}
-
-static int ost_statfs(struct ptlrpc_request *req)
-{
- struct obd_statfs *osfs;
- int rc;
- ENTRY;
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
-
- req->rq_status = obd_statfs(req->rq_svc_thread->t_env, req->rq_export,
- osfs,
- cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
- 0);
- if (req->rq_status != 0)
- CERROR("ost: statfs failed: rc %d\n", req->rq_status);
-
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
- req->rq_status = -EINPROGRESS;
-
- RETURN(0);
-}
-
-static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
- oti->oti_logcookies = &body->oa.o_lcookie;
-
- req->rq_status = obd_create(req->rq_svc_thread->t_env, exp,
- &repbody->oa, NULL, oti);
- //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
- RETURN(0);
-}
-
-static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- __u64 flags = 0;
- struct lustre_handle lh = {0,};
- int rc;
- ENTRY;
-
- /* check that we do support OBD_CONNECT_TRUNCLOCK. */
- CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
- (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
- RETURN(-EPROTO);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- /* standard truncate optimization: if file body is completely
- * destroyed, don't send data back to the server. */
- if (body->oa.o_size == 0)
- flags |= LDLM_FL_AST_DISCARD_DATA;
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
-
- rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
- repbody->oa.o_blocks, &lh, LCK_PW, flags);
- if (rc == 0) {
- struct obd_info *oinfo;
- struct lustre_capa *capa = NULL;
-
- if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
- repbody->oa.o_flags == OBD_FL_SRVLOCK)
- /*
- * If OBD_FL_SRVLOCK is the only bit set in
- * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
- * through filter_setattr() to filter_iocontrol().
- */
- repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
-
- if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill,
- &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST PUNCH");
- GOTO(unlock, rc = -EFAULT);
- }
- }
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- GOTO(unlock, rc = -ENOMEM);
- oinfo->oi_oa = &repbody->oa;
- oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
- oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
- oinfo->oi_capa = capa;
- oinfo->oi_flags = OBD_FL_PUNCH;
-
- req->rq_status = obd_punch(req->rq_svc_thread->t_env, exp,
- oinfo, oti, NULL);
- OBD_FREE_PTR(oinfo);
-unlock:
- ost_lock_put(exp, &lh, LCK_PW);
- }
-
- ost_drop_id(exp, &repbody->oa);
- RETURN(rc);
-}
-
-static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- struct obd_info *oinfo;
- struct lustre_capa *capa = NULL;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST SYNC");
- RETURN (-EFAULT);
- }
- }
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- RETURN(-ENOMEM);
-
- oinfo->oi_oa = &repbody->oa;
- oinfo->oi_capa = capa;
- oinfo->oi_jobid = oti->oti_jobid;
- req->rq_status = obd_sync(req->rq_svc_thread->t_env, exp, oinfo,
- repbody->oa.o_size, repbody->oa.o_blocks,
- NULL);
- OBD_FREE_PTR(oinfo);
-
- ost_drop_id(exp, &repbody->oa);
- RETURN(0);
-}
-
-static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- struct obd_info *oinfo;
- struct lustre_capa *capa = NULL;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST SETATTR");
- RETURN (-EFAULT);
- }
- }
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- RETURN(-ENOMEM);
- oinfo->oi_oa = &repbody->oa;
- oinfo->oi_capa = capa;
-
- req->rq_status = obd_setattr(req->rq_svc_thread->t_env, exp, oinfo,
- oti);
-
- OBD_FREE_PTR(oinfo);
-
- ost_drop_id(exp, &repbody->oa);
- RETURN(0);
-}
-
-static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
- cksum_type_t cksum_type)
-{
- struct cfs_crypto_hash_desc *hdesc;
- unsigned int bufsize;
- int i, err;
- unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
- __u32 cksum;
-
- hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
- if (IS_ERR(hdesc)) {
- CERROR("Unable to initialize checksum hash %s\n",
- cfs_crypto_hash_name(cfs_alg));
- return PTR_ERR(hdesc);
- }
- CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
- for (i = 0; i < desc->bd_iov_count; i++) {
-
- /* corrupt the data before we compute the checksum, to
- * simulate a client->OST data error */
- if (i == 0 && opc == OST_WRITE &&
- OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
- int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
- int len = desc->bd_iov[i].kiov_len;
- struct page *np = ost_page_to_corrupt;
- char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
-
- if (np) {
- char *ptr2 = kmap(np) + off;
-
- memcpy(ptr2, ptr, len);
- memcpy(ptr2, "bad3", min(4, len));
- kunmap(np);
- desc->bd_iov[i].kiov_page = np;
- } else {
- CERROR("can't alloc page for corruption\n");
- }
- }
- cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
- desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
- desc->bd_iov[i].kiov_len);
-
- /* corrupt the data after we compute the checksum, to
- * simulate an OST->client data error */
- if (i == 0 && opc == OST_READ &&
- OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
- int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
- int len = desc->bd_iov[i].kiov_len;
- struct page *np = ost_page_to_corrupt;
- char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
-
- if (np) {
- char *ptr2 = kmap(np) + off;
-
- memcpy(ptr2, ptr, len);
- memcpy(ptr2, "bad4", min(4, len));
- kunmap(np);
- desc->bd_iov[i].kiov_page = np;
- } else {
- CERROR("can't alloc page for corruption\n");
- }
- }
- }
-
- bufsize = 4;
- err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
- if (err)
- cfs_crypto_hash_final(hdesc, NULL, NULL);
-
- return cksum;
-}
-
-static int ost_brw_lock_get(int mode, struct obd_export *exp,
- struct obd_ioobj *obj, struct niobuf_remote *nb,
- struct lustre_handle *lh)
-{
- __u64 flags = 0;
- int nrbufs = obj->ioo_bufcnt;
- struct ldlm_res_id res_id;
- ldlm_policy_data_t policy;
- int i;
- ENTRY;
-
- ostid_build_res_name(&obj->ioo_oid, &res_id);
- LASSERT(mode == LCK_PR || mode == LCK_PW);
- LASSERT(!lustre_handle_is_used(lh));
-
- if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
- RETURN(0);
-
- for (i = 1; i < nrbufs; i ++)
- if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
- (nb[i].flags & OBD_BRW_SRVLOCK))
- RETURN(-EFAULT);
-
- policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
- policy.l_extent.end = (nb[nrbufs - 1].offset +
- nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
-
- RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
- LDLM_EXTENT, &policy, mode, &flags,
- ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
- NULL, lh));
-}
-
-static void ost_brw_lock_put(int mode,
- struct obd_ioobj *obj, struct niobuf_remote *niob,
- struct lustre_handle *lh)
-{
- ENTRY;
- LASSERT(mode == LCK_PR || mode == LCK_PW);
- LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
- lustre_handle_is_used(lh));
- if (lustre_handle_is_used(lh))
- ldlm_lock_decref(lh, mode);
- EXIT;
-}
-
-/* Allocate thread local buffers if needed */
-static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
-{
- struct ost_thread_local_cache *tls =
- (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-
- /* In normal mode of operation an I/O request is serviced only
- * by ll_ost_io threads each of them has own tls buffers allocated by
- * ost_io_thread_init().
- * During recovery, an I/O request may be queued until any of the ost
- * service threads process it. Not necessary it should be one of
- * ll_ost_io threads. In that case we dynamically allocating tls
- * buffers for the request service time. */
- if (unlikely(tls == NULL)) {
- LASSERT(r->rq_export->exp_in_recovery);
- OBD_ALLOC_PTR(tls);
- if (tls != NULL) {
- tls->temporary = 1;
- r->rq_svc_thread->t_data = tls;
- }
- }
- return tls;
-}
-
-/* Free thread local buffers if they were allocated only for servicing
- * this one request */
-static void ost_tls_put(struct ptlrpc_request *r)
-{
- struct ost_thread_local_cache *tls =
- (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-
- if (unlikely(tls->temporary)) {
- OBD_FREE_PTR(tls);
- r->rq_svc_thread->t_data = NULL;
- }
-}
-
-static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
-{
- struct ptlrpc_bulk_desc *desc = NULL;
- struct obd_export *exp = req->rq_export;
- struct niobuf_remote *remote_nb;
- struct niobuf_local *local_nb;
- struct obd_ioobj *ioo;
- struct ost_body *body, *repbody;
- struct lustre_capa *capa = NULL;
- struct l_wait_info lwi;
- struct lustre_handle lockh = { 0 };
- int niocount, npages, nob = 0, rc, i;
- int no_reply = 0;
- struct ost_thread_local_cache *tls;
- ENTRY;
-
- req->rq_bulk_read = 1;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
- GOTO(out, rc = -EIO);
-
- OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
-
- /* Check if there is eviction in progress, and if so, wait for it to
- * finish */
- if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
- lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
- rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
- !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
- &lwi);
- }
- if (exp->exp_failed)
- GOTO(out, rc = -ENOTCONN);
-
- /* ost_body, ioobj & noibuf_remote are verified and swabbed in
- * ost_rw_hpreq_check(). */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- GOTO(out, rc = -EFAULT);
-
- /*
- * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
- * would be useful here and wherever we get &RMF_OBD_IOOBJ and
- * &RMF_NIOBUF_REMOTE.
- */
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- if (ioo == NULL)
- GOTO(out, rc = -EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, ioo);
- if (rc)
- RETURN(rc);
-
- niocount = ioo->ioo_bufcnt;
- remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- if (remote_nb == NULL)
- GOTO(out, rc = -EFAULT);
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST BRW READ");
- GOTO(out, rc = -EFAULT);
- }
- }
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- GOTO(out, rc);
-
- tls = ost_tls_get(req);
- if (tls == NULL)
- GOTO(out_bulk, rc = -ENOMEM);
- local_nb = tls->local;
-
- rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
- if (rc != 0)
- GOTO(out_tls, rc);
-
- /*
- * If getting the lock took more time than
- * client was willing to wait, drop it. b=11330
- */
- if (cfs_time_current_sec() > req->rq_deadline ||
- OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
- no_reply = 1;
- CERROR("Dropping timed-out read from %s because locking"
- "object "DOSTID" took %ld seconds (limit was %ld).\n",
- libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
- cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
- req->rq_deadline - req->rq_arrival_time.tv_sec);
- GOTO(out_lock, rc = -ETIMEDOUT);
- }
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
-
- npages = OST_THREAD_POOL_SIZE;
- rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
- &repbody->oa, 1, ioo, remote_nb, &npages, local_nb,
- oti, capa);
- if (rc != 0)
- GOTO(out_lock, rc);
-
- desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
- BULK_PUT_SOURCE, OST_BULK_PORTAL);
- if (desc == NULL)
- GOTO(out_commitrw, rc = -ENOMEM);
-
- nob = 0;
- for (i = 0; i < npages; i++) {
- int page_rc = local_nb[i].rc;
-
- if (page_rc < 0) { /* error */
- rc = page_rc;
- break;
- }
-
- nob += page_rc;
- if (page_rc != 0) { /* some data! */
- LASSERT (local_nb[i].page != NULL);
- ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
- local_nb[i].lnb_page_offset,
- page_rc);
- }
-
- if (page_rc != local_nb[i].len) { /* short read */
- /* All subsequent pages should be 0 */
- while(++i < npages)
- LASSERT(local_nb[i].rc == 0);
- break;
- }
- }
-
- if (body->oa.o_valid & OBD_MD_FLCKSUM) {
- cksum_type_t cksum_type =
- cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
- repbody->oa.o_flags : 0);
- repbody->oa.o_flags = cksum_type_pack(cksum_type);
- repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
- repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
- CDEBUG(D_PAGE, "checksum at read origin: %x\n",
- repbody->oa.o_cksum);
- } else {
- repbody->oa.o_valid = 0;
- }
- /* We're finishing using body->oa as an input variable */
-
- /* Check if client was evicted while we were doing i/o before touching
- network */
- if (rc == 0) {
- if (likely(!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)))
- rc = target_bulk_io(exp, desc, &lwi);
- no_reply = rc != 0;
- }
-
-out_commitrw:
- /* Must commit after prep above in all cases */
- rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
- &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
- oti, rc);
-
- if (rc == 0)
- ost_drop_id(exp, &repbody->oa);
-
-out_lock:
- ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
-out_tls:
- ost_tls_put(req);
-out_bulk:
- if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
- ptlrpc_free_bulk_nopin(desc);
-out:
- LASSERT(rc <= 0);
- if (rc == 0) {
- req->rq_status = nob;
- ptlrpc_lprocfs_brw(req, nob);
- target_committed_to_req(req);
- ptlrpc_reply(req);
- } else if (!no_reply) {
- /* Only reply if there was no comms problem with bulk */
- target_committed_to_req(req);
- req->rq_status = rc;
- ptlrpc_error(req);
- } else {
- /* reply out callback would free */
- ptlrpc_req_drop_rs(req);
- LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
- "client will retry: rc %d\n",
- exp->exp_obd->obd_name,
- obd_uuid2str(&exp->exp_client_uuid),
- obd_export_nid2str(exp), rc);
- }
- /* send a bulk after reply to simulate a network delay or reordering
- * by a router */
- if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
- wait_queue_head_t waitq;
- struct l_wait_info lwi1;
-
- CDEBUG(D_INFO, "reorder BULK\n");
- init_waitqueue_head(&waitq);
-
- lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
- l_wait_event(waitq, 0, &lwi1);
- rc = target_bulk_io(exp, desc, &lwi);
- ptlrpc_free_bulk_nopin(desc);
- }
-
- RETURN(rc);
-}
-
-static void ost_warn_on_cksum(struct ptlrpc_request *req,
- struct ptlrpc_bulk_desc *desc,
- struct niobuf_local *local_nb, int npages,
- obd_count client_cksum, obd_count server_cksum,
- int mmap)
-{
- struct obd_export *exp = req->rq_export;
- struct ost_body *body;
- char *router;
- char *via;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT (body != NULL);
-
- if (req->rq_peer.nid == desc->bd_sender) {
- via = router = "";
- } else {
- via = " via ";
- router = libcfs_nid2str(desc->bd_sender);
- }
-
- if (mmap) {
- CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
- client_cksum, server_cksum);
- return;
- }
-
- LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
- DFID" object "DOSTID" extent ["LPU64"-"LPU64
- "]: client csum %x, server csum %x\n",
- exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
- via, router,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_seq : (__u64)0,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_oid : 0,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_ver : 0,
- POSTID(&body->oa.o_oi),
- local_nb[0].lnb_file_offset,
- local_nb[npages-1].lnb_file_offset +
- local_nb[npages-1].len - 1,
- client_cksum, server_cksum);
-}
-
-static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
-{
- struct ptlrpc_bulk_desc *desc = NULL;
- struct obd_export *exp = req->rq_export;
- struct niobuf_remote *remote_nb;
- struct niobuf_local *local_nb;
- struct obd_ioobj *ioo;
- struct ost_body *body, *repbody;
- struct l_wait_info lwi;
- struct lustre_handle lockh = {0};
- struct lustre_capa *capa = NULL;
- __u32 *rcs;
- int objcount, niocount, npages;
- int rc, i, j;
- obd_count client_cksum = 0, server_cksum = 0;
- cksum_type_t cksum_type = OBD_CKSUM_CRC32;
- int no_reply = 0, mmap = 0;
- __u32 o_uid = 0, o_gid = 0;
- struct ost_thread_local_cache *tls;
- ENTRY;
-
- req->rq_bulk_write = 1;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
- GOTO(out, rc = -EIO);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
- GOTO(out, rc = -EFAULT);
-
- /* pause before transaction has been started */
- OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
-
- /* ost_body, ioobj & noibuf_remote are verified and swabbed in
- * ost_rw_hpreq_check(). */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- GOTO(out, rc = -EFAULT);
-
- objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
- RCL_CLIENT) / sizeof(*ioo);
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- if (ioo == NULL)
- GOTO(out, rc = -EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, ioo);
- if (rc)
- RETURN(rc);
-
- for (niocount = i = 0; i < objcount; i++)
- niocount += ioo[i].ioo_bufcnt;
-
- /*
- * It'd be nice to have a capsule function to indicate how many elements
- * there were in a buffer for an RMF that's declared to be an array.
- * It's easy enough to compute the number of elements here though.
- */
- remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
- &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
- GOTO(out, rc = -EFAULT);
-
- if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
- (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
- memory_pressure_set();
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST BRW WRITE");
- GOTO(out, rc = -EFAULT);
- }
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
- niocount * sizeof(*rcs));
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc != 0)
- GOTO(out, rc);
- CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
- rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
-
- tls = ost_tls_get(req);
- if (tls == NULL)
- GOTO(out_bulk, rc = -ENOMEM);
- local_nb = tls->local;
-
- rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
- if (rc != 0)
- GOTO(out_tls, rc);
-
- /*
- * If getting the lock took more time than
- * client was willing to wait, drop it. b=11330
- */
- if (cfs_time_current_sec() > req->rq_deadline ||
- OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
- no_reply = 1;
- CERROR("Dropping timed-out write from %s because locking "
- "object "DOSTID" took %ld seconds (limit was %ld).\n",
- libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
- cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
- req->rq_deadline - req->rq_arrival_time.tv_sec);
- GOTO(out_lock, rc = -ETIMEDOUT);
- }
-
- /* obd_preprw clobbers oa->valid, so save what we need */
- if (body->oa.o_valid & OBD_MD_FLCKSUM) {
- client_cksum = body->oa.o_cksum;
- if (body->oa.o_valid & OBD_MD_FLFLAGS)
- cksum_type = cksum_type_unpack(body->oa.o_flags);
- }
- if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
- mmap = 1;
-
- /* Because we already sync grant info with client when reconnect,
- * grant info will be cleared for resent req, then fed_grant and
- * total_grant will not be modified in following preprw_write */
- if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
- DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
- body->oa.o_valid &= ~OBD_MD_FLGRANT;
- }
-
- if (exp_connect_rmtclient(exp)) {
- o_uid = body->oa.o_uid;
- o_gid = body->oa.o_gid;
- }
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
-
- npages = OST_THREAD_POOL_SIZE;
- rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
- &repbody->oa, objcount, ioo, remote_nb, &npages,
- local_nb, oti, capa);
- if (rc != 0)
- GOTO(out_lock, rc);
-
- desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
- BULK_GET_SINK, OST_BULK_PORTAL);
- if (desc == NULL)
- GOTO(skip_transfer, rc = -ENOMEM);
-
- /* NB Having prepped, we must commit... */
- for (i = 0; i < npages; i++)
- ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
- local_nb[i].lnb_page_offset,
- local_nb[i].len);
-
- rc = sptlrpc_svc_prep_bulk(req, desc);
- if (rc != 0)
- GOTO(out_lock, rc);
-
- rc = target_bulk_io(exp, desc, &lwi);
- no_reply = rc != 0;
-
-skip_transfer:
- if (client_cksum != 0 && rc == 0) {
- static int cksum_counter;
- repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
- repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
- repbody->oa.o_flags |= cksum_type_pack(cksum_type);
- server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
- repbody->oa.o_cksum = server_cksum;
- cksum_counter++;
- if (unlikely(client_cksum != server_cksum)) {
- ost_warn_on_cksum(req, desc, local_nb, npages,
- client_cksum, server_cksum, mmap);
- cksum_counter = 0;
-
- } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
- CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
- cksum_counter, libcfs_id2str(req->rq_peer),
- server_cksum);
- }
- }
-
- /* Must commit after prep above in all cases */
- rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
- &repbody->oa, objcount, ioo, remote_nb, npages,
- local_nb, oti, rc);
- if (rc == -ENOTCONN)
- /* quota acquire process has been given up because
- * either the client has been evicted or the client
- * has timed out the request already */
- no_reply = 1;
-
- if (exp_connect_rmtclient(exp)) {
- repbody->oa.o_uid = o_uid;
- repbody->oa.o_gid = o_gid;
- }
-
- /*
- * Disable sending mtime back to the client. If the client locked the
- * whole object, then it has already updated the mtime on its side,
- * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
- */
- repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
-
- if (rc == 0) {
- int nob = 0;
-
- /* set per-requested niobuf return codes */
- for (i = j = 0; i < niocount; i++) {
- int len = remote_nb[i].len;
-
- nob += len;
- rcs[i] = 0;
- do {
- LASSERT(j < npages);
- if (local_nb[j].rc < 0)
- rcs[i] = local_nb[j].rc;
- len -= local_nb[j].len;
- j++;
- } while (len > 0);
- LASSERT(len == 0);
- }
- LASSERT(j == npages);
- ptlrpc_lprocfs_brw(req, nob);
- }
-
-out_lock:
- ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
-out_tls:
- ost_tls_put(req);
-out_bulk:
- if (desc)
- ptlrpc_free_bulk_nopin(desc);
-out:
- if (rc == 0) {
- oti_to_request(oti, req);
- target_committed_to_req(req);
- rc = ptlrpc_reply(req);
- } else if (!no_reply) {
- /* Only reply if there was no comms problem with bulk */
- target_committed_to_req(req);
- req->rq_status = rc;
- ptlrpc_error(req);
- } else {
- /* reply out callback would free */
- ptlrpc_req_drop_rs(req);
- LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
- "client will retry: rc %d\n",
- exp->exp_obd->obd_name,
- obd_uuid2str(&exp->exp_client_uuid),
- obd_export_nid2str(exp), rc);
- }
- memory_pressure_clr();
- RETURN(rc);
-}
-
-/**
- * Implementation of OST_SET_INFO.
- *
- * OST_SET_INFO is like ioctl(): heavily overloaded. Specifically, it takes a
- * "key" and a value RPC buffers as arguments, with the value's contents
- * interpreted according to the key.
- *
- * Value types that need swabbing have swabbing done explicitly, either here or
- * in functions called from here. This should be corrected: all swabbing should
- * be done in the capsule abstraction, as that will then allow us to move
- * swabbing exclusively to the client without having to modify server code
- * outside the capsule abstraction's implementation itself. To correct this
- * will require minor changes to the capsule abstraction; see the comments for
- * req_capsule_extend() in layout.c.
- */
-static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
-{
- struct ost_body *body = NULL, *repbody;
- char *key, *val = NULL;
- int keylen, vallen, rc = 0;
- int is_grant_shrink = 0;
- ENTRY;
-
- key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
- if (key == NULL) {
- DEBUG_REQ(D_HA, req, "no set_info key");
- RETURN(-EFAULT);
- }
- keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
- RCL_CLIENT);
-
- vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
- RCL_CLIENT);
-
- if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
- /* In this case the value is actually an RMF_OST_BODY, so we
- * transmutate the type of this PTLRPC */
- req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- if (vallen) {
- if (is_grant_shrink) {
- body = req_capsule_client_get(&req->rq_pill,
- &RMF_OST_BODY);
- if (!body)
- RETURN(-EFAULT);
-
- repbody = req_capsule_server_get(&req->rq_pill,
- &RMF_OST_BODY);
- memcpy(repbody, body, sizeof(*body));
- val = (char*)repbody;
- } else {
- val = req_capsule_client_get(&req->rq_pill,
- &RMF_SETINFO_VAL);
- }
- }
-
- if (KEY_IS(KEY_EVICT_BY_NID)) {
- if (val && vallen)
- obd_export_evict_by_nid(exp->exp_obd, val);
- GOTO(out, rc = 0);
- } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
- if (vallen < sizeof(__u32))
- RETURN(-EFAULT);
- __swab32s((__u32 *)val);
- }
-
- /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
- * a struct ost_body * value */
- rc = obd_set_info_async(req->rq_svc_thread->t_env, exp, keylen,
- key, vallen, val, NULL);
-out:
- lustre_msg_set_status(req->rq_repmsg, 0);
- RETURN(rc);
-}
-
-struct locked_region {
- cfs_list_t list;
- struct lustre_handle lh;
-};
-
-static int lock_region(struct obd_export *exp, struct obdo *oa,
- unsigned long long begin, unsigned long long end,
- cfs_list_t *locked)
-{
- struct locked_region *region = NULL;
- int rc;
-
- LASSERT(begin <= end);
- OBD_ALLOC_PTR(region);
- if (region == NULL)
- return -ENOMEM;
-
- rc = ost_lock_get(exp, oa, begin, end - begin, ®ion->lh, LCK_PR, 0);
- if (rc) {
- OBD_FREE_PTR(region);
- return rc;
- }
-
- CDEBUG(D_OTHER, "ost lock [%llu,%llu], lh=%p\n",
- begin, end, ®ion->lh);
- cfs_list_add(®ion->list, locked);
-
- return 0;
-}
-
-static int lock_zero_regions(struct obd_export *exp, struct obdo *oa,
- struct ll_user_fiemap *fiemap,
- cfs_list_t *locked)