X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=c5bca5ec1a6e1eaecc31cfbc9d00b81d735f9d9d;hp=f73f218926cdccc8df0ad646e91b9b5a129fcc51;hb=586e95a5b3f7b9525d78e7efc9f2949387fc9d54;hpb=f7a81d4797933d179f9955bb0821779d3ac9a8fe;ds=sidebyside diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index f73f218..c5bca5e 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -179,13 +179,14 @@ static inline void osc_pack_capa(struct ptlrpc_request *req, static inline void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo) { - struct ost_body *body; + struct ost_body *body; - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); - lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, + oinfo->oi_oa); + osc_pack_capa(req, body, oinfo->oi_capa); } static inline void osc_set_capa_size(struct ptlrpc_request *req, @@ -211,8 +212,9 @@ static int osc_getattr_interpret(const struct lu_env *env, body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body) { - CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa); + CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, + aa->aa_oi->oi_oa, &body->oa); /* This should really be sent by the OST */ aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; @@ -290,8 +292,9 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp, if (body == NULL) GOTO(out, rc = -EPROTO); - CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(oinfo->oi_oa, &body->oa); + CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, + &body->oa); oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; @@ -335,7 +338,8 @@ static int osc_setattr(const struct lu_env *env, struct obd_export *exp, if (body == NULL) GOTO(out, rc = -EPROTO); - lustre_get_wire_obdo(oinfo->oi_oa, &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, + &body->oa); EXIT; out: @@ -357,7 +361,8 @@ static int osc_setattr_interpret(const struct lu_env *env, if (body == NULL) GOTO(out, rc = -EPROTO); - lustre_get_wire_obdo(sa->sa_oa, &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa, + &body->oa); out: rc = sa->sa_upcall(sa->sa_cookie, rc); RETURN(rc); @@ -453,7 +458,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&body->oa, oa); + + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); ptlrpc_request_set_replen(req); @@ -473,7 +479,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, if (body == NULL) GOTO(out_req, rc = -EPROTO); - lustre_get_wire_obdo(oa, &body->oa); + CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; @@ -528,10 +535,11 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, + oinfo->oi_oa); + osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); @@ -604,11 +612,12 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); } - /* overload the size and blocks fields in the oa with start/end */ - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + /* overload the size and blocks fields in the oa with start/end */ + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, + oinfo->oi_oa); + osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); req->rq_interpret_reply = osc_sync_interpret; @@ -649,8 +658,8 @@ static int osc_sync(const struct lu_env *env, struct obd_export *exp, * @objid. Found locks are added into @cancel list. Returns the amount of * locks added to @cancels list. */ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, - cfs_list_t *cancels, - ldlm_mode_t mode, int lock_flags) + cfs_list_t *cancels, + ldlm_mode_t mode, __u64 lock_flags) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct ldlm_res_id res_id; @@ -681,32 +690,32 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, } static int osc_destroy_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *data, - int rc) + struct ptlrpc_request *req, void *data, + int rc) { - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - cfs_atomic_dec(&cli->cl_destroy_in_flight); - cfs_waitq_signal(&cli->cl_destroy_waitq); - return 0; + cfs_atomic_dec(&cli->cl_destroy_in_flight); + wake_up(&cli->cl_destroy_waitq); + return 0; } static int osc_can_send_destroy(struct client_obd *cli) { - if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <= - cli->cl_max_rpcs_in_flight) { - /* The destroy request can be sent */ - return 1; - } - if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) < - cli->cl_max_rpcs_in_flight) { - /* - * The counter has been modified between the two atomic - * operations. - */ - cfs_waitq_signal(&cli->cl_destroy_waitq); - } - return 0; + if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <= + cli->cl_max_rpcs_in_flight) { + /* The destroy request can be sent */ + return 1; + } + if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) < + cli->cl_max_rpcs_in_flight) { + /* + * The counter has been modified between the two atomic + * operations. + */ + wake_up(&cli->cl_destroy_waitq); + } + return 0; } int osc_create(const struct lu_env *env, struct obd_export *exp, @@ -781,11 +790,11 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); - if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) - oa->o_lcookie = *oti->oti_logcookies; - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - lustre_set_wire_obdo(&body->oa, oa); + if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) + oa->o_lcookie = *oti->oti_logcookies; + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); osc_pack_capa(req, body, (struct obd_capa *)capa); ptlrpc_request_set_replen(req); @@ -829,13 +838,16 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, CERROR("dirty %lu - %lu > dirty_max %lu\n", cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); oa->o_undirty = 0; - } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) - + } else if (unlikely(cfs_atomic_read(&obd_unstable_pages) + + cfs_atomic_read(&obd_dirty_pages) - cfs_atomic_read(&obd_dirty_transit_pages) > (long)(obd_max_dirty_pages + 1))) { /* The cfs_atomic_read() allowing the cfs_atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("dirty %d - %d > system dirty_max %d\n", + CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n", + cli->cl_import->imp_obd->obd_name, + cfs_atomic_read(&obd_unstable_pages), cfs_atomic_read(&obd_dirty_pages), cfs_atomic_read(&obd_dirty_transit_pages), obd_max_dirty_pages); @@ -846,7 +858,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_undirty = 0; } else { long max_in_flight = (cli->cl_max_pages_per_rpc << - CFS_PAGE_SHIFT)* + PAGE_CACHE_SHIFT) * (cli->cl_max_rpcs_in_flight + 1); oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); } @@ -928,11 +940,11 @@ static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) static int osc_shrink_grant(struct client_obd *cli) { __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * - (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT); + (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT); client_obd_list_lock(&cli->cl_loi_list_lock); if (cli->cl_avail_grant <= target_bytes) - target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; client_obd_list_unlock(&cli->cl_loi_list_lock); return osc_shrink_grant_to_target(cli, target_bytes); @@ -948,8 +960,8 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) /* Don't shrink if we are already above or below the desired limit * We don't want to shrink below a single RPC, as that will negatively * impact block allocation and long-term performance. */ - if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT) - target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT) + target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; if (target_bytes >= cli->cl_avail_grant) { client_obd_list_unlock(&cli->cl_loi_list_lock); @@ -996,7 +1008,7 @@ static int osc_should_shrink_grant(struct client_obd *client) /* Get the current RPC size directly, instead of going via: * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export) * Keep comment here so that it can be found by searching. */ - int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; if (client->cl_import->imp_state == LUSTRE_IMP_FULL && client->cl_avail_grant > brw_size) @@ -1070,7 +1082,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) } /* determine the appropriate chunk size used by osc_extent. */ - cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize); + cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize); client_obd_list_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." @@ -1096,29 +1108,29 @@ static void handle_short_read(int nob_read, obd_count page_count, while (nob_read > 0) { LASSERT (page_count > 0); - if (pga[i]->count > nob_read) { - /* EOF inside this page */ - ptr = cfs_kmap(pga[i]->pg) + - (pga[i]->off & ~CFS_PAGE_MASK); - memset(ptr + nob_read, 0, pga[i]->count - nob_read); - cfs_kunmap(pga[i]->pg); - page_count--; - i++; - break; - } + if (pga[i]->count > nob_read) { + /* EOF inside this page */ + ptr = kmap(pga[i]->pg) + + (pga[i]->off & ~CFS_PAGE_MASK); + memset(ptr + nob_read, 0, pga[i]->count - nob_read); + kunmap(pga[i]->pg); + page_count--; + i++; + break; + } nob_read -= pga[i]->count; page_count--; i++; } - /* zero remaining pages */ - while (page_count-- > 0) { - ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); - memset(ptr, 0, pga[i]->count); - cfs_kunmap(pga[i]->pg); - i++; - } + /* zero remaining pages */ + while (page_count-- > 0) { + ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); + memset(ptr, 0, pga[i]->count); + kunmap(pga[i]->pg); + i++; + } } static int check_write_rcs(struct ptlrpc_request *req, @@ -1203,10 +1215,10 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, * simulate an OST->client data error */ if (i == 0 && opc == OST_READ && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { - unsigned char *ptr = cfs_kmap(pga[i]->pg); + unsigned char *ptr = kmap(pga[i]->pg); int off = pga[i]->off & ~CFS_PAGE_MASK; memcpy(ptr + off, "bad1", min(4, nob)); - cfs_kunmap(pga[i]->pg); + kunmap(pga[i]->pg); } cfs_crypto_hash_update_page(hdesc, pga[i]->pg, pga[i]->off & ~CFS_PAGE_MASK, @@ -1305,7 +1317,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); LASSERT(body != NULL && ioobj != NULL && niobuf != NULL); - lustre_set_wire_obdo(&body->oa, oa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); obdo_to_ioobj(oa, ioobj); ioobj->ioo_bufcnt = niocount; @@ -1324,13 +1336,13 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, LASSERT(pg->count > 0); /* make sure there is no gap in the middle of page array */ - LASSERTF(page_count == 1 || - (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) && - ergo(i > 0 && i < page_count - 1, - poff == 0 && pg->count == CFS_PAGE_SIZE) && - ergo(i == page_count - 1, poff == 0)), - "i: %d/%d pg: %p off: "LPU64", count: %u\n", - i, page_count, pg, pg->off, pg->count); + LASSERTF(page_count == 1 || + (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) && + ergo(i > 0 && i < page_count - 1, + poff == 0 && pg->count == PAGE_CACHE_SIZE) && + ergo(i == page_count - 1, poff == 0)), + "i: %d/%d pg: %p off: "LPU64", count: %u\n", + i, page_count, pg, pg->off, pg->count); #ifdef __linux__ LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 @@ -1636,8 +1648,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) rc = 0; } out: - if (rc >= 0) - lustre_get_wire_obdo(aa->aa_oa, &body->oa); + if (rc >= 0) + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, + aa->aa_oa, &body->oa); RETURN(rc); } @@ -1647,16 +1660,16 @@ static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa, obd_count page_count, struct brw_page **pga, struct obd_capa *ocapa) { - struct ptlrpc_request *req; - int rc; - cfs_waitq_t waitq; - int generation, resends = 0; - struct l_wait_info lwi; + struct ptlrpc_request *req; + int rc; + wait_queue_head_t waitq; + int generation, resends = 0; + struct l_wait_info lwi; - ENTRY; + ENTRY; - cfs_waitq_init(&waitq); - generation = exp->exp_obd->u.cli.cl_import->imp_generation; + init_waitqueue_head(&waitq); + generation = exp->exp_obd->u.cli.cl_import->imp_generation; restart_bulk: rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm, @@ -1748,6 +1761,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, aa->aa_resends++; new_req->rq_interpret_reply = request->rq_interpret_reply; new_req->rq_async_args = request->rq_async_args; + new_req->rq_commit_cb = request->rq_commit_cb; /* cap resend delay to the current request timeout, this is similar to * what ptlrpc does (see after_reply()) */ if (aa->aa_resends > new_req->rq_timeout) @@ -1830,7 +1844,7 @@ static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages) if (pages == 0) /* that's all */ return count; - if (offset + pg[i]->count < CFS_PAGE_SIZE) + if (offset + pg[i]->count < PAGE_CACHE_SIZE) return count; /* doesn't end on page boundary */ i++; @@ -2041,6 +2055,23 @@ static int brw_interpret(const struct lu_env *env, RETURN(rc); } +static void brw_commit(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_lock); + /* If osc_inc_unstable_pages (via osc_extent_finish) races with + * this called via the rq_commit_cb, I need to ensure + * osc_dec_unstable_pages is still called. Otherwise unstable + * pages may be leaked. */ + if (req->rq_unstable) { + spin_unlock(&req->rq_lock); + osc_dec_unstable_pages(req); + spin_lock(&req->rq_lock); + } else { + req->rq_committed = 1; + } + spin_unlock(&req->rq_lock); +} + /** * Build an RPC by the list of extent @ext_list. The caller must ensure * that the total pages in this list are NOT over max pages per RPC. @@ -2089,7 +2120,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, oap->oap_count; else LASSERT(oap->oap_page_off + oap->oap_count == - CFS_PAGE_SIZE); + PAGE_CACHE_SIZE); } } @@ -2123,7 +2154,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", - pga[i]->pg, cfs_page_index(oap->oap_page), oap, + pga[i]->pg, page_index(oap->oap_page), oap, pga[i]->flag); i++; cl_req_page_add(env, clerq, page); @@ -2152,7 +2183,9 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, GOTO(out, rc); } + req->rq_commit_cb = brw_commit; req->rq_interpret_reply = brw_interpret; + if (mem_tight != 0) req->rq_memalloc = 1; @@ -2191,7 +2224,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, tmp->oap_request = ptlrpc_request_addref(req); client_obd_list_lock(&cli->cl_loi_list_lock); - starting_offset >>= CFS_PAGE_SHIFT; + starting_offset >>= PAGE_CACHE_SHIFT; if (cmd == OBD_BRW_READ) { cli->cl_r_in_flight++; lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); @@ -2344,6 +2377,8 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, &RMF_DLM_REP); LASSERT(rep != NULL); + rep->lock_policy_res1 = + ptlrpc_status_ntoh(rep->lock_policy_res1); if (rep->lock_policy_res1) rc = rep->lock_policy_res1; } @@ -2424,8 +2459,8 @@ static int osc_enqueue_interpret(const struct lu_env *env, } void osc_update_enqueue(struct lustre_handle *lov_lockhp, - struct lov_oinfo *loi, int flags, - struct ost_lvb *lvb, __u32 mode, int rc) + struct lov_oinfo *loi, __u64 flags, + struct ost_lvb *lvb, __u32 mode, int rc) { struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); @@ -2479,19 +2514,19 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, ldlm_policy_data_t *policy, - struct ost_lvb *lvb, int kms_valid, - obd_enqueue_update_f upcall, void *cookie, - struct ldlm_enqueue_info *einfo, - struct lustre_handle *lockh, - struct ptlrpc_request_set *rqset, int async, int agl) -{ - struct obd_device *obd = exp->exp_obd; - struct ptlrpc_request *req = NULL; - int intent = *flags & LDLM_FL_HAS_INTENT; - int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); - ldlm_mode_t mode; - int rc; - ENTRY; + struct ost_lvb *lvb, int kms_valid, + obd_enqueue_update_f upcall, void *cookie, + struct ldlm_enqueue_info *einfo, + struct lustre_handle *lockh, + struct ptlrpc_request_set *rqset, int async, int agl) +{ + struct obd_device *obd = exp->exp_obd; + struct ptlrpc_request *req = NULL; + int intent = *flags & LDLM_FL_HAS_INTENT; + __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); + ldlm_mode_t mode; + int rc; + ENTRY; /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ @@ -2527,7 +2562,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (mode) { struct ldlm_lock *matched = ldlm_handle2lock(lockh); - if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) { + if ((agl != 0) && !ldlm_is_lvb_ready(matched)) { /* For AGL, if enqueue RPC is sent but the lock is not * granted, then skip to process this strpe. * Return -ECANCELED to tell the caller. */ @@ -2640,14 +2675,14 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, } int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, - __u32 type, ldlm_policy_data_t *policy, __u32 mode, - int *flags, void *data, struct lustre_handle *lockh, - int unref) + __u32 type, ldlm_policy_data_t *policy, __u32 mode, + __u64 *flags, void *data, struct lustre_handle *lockh, + int unref) { - struct obd_device *obd = exp->exp_obd; - int lflags = *flags; - ldlm_mode_t rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + __u64 lflags = *flags; + ldlm_mode_t rc; + ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) RETURN(-EIO); @@ -2879,7 +2914,7 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) /* we only need the header part from user space to get lmm_magic and * lmm_stripe_count, (the header part is common to v1 and v3) */ lum_size = sizeof(struct lov_user_md_v1); - if (cfs_copy_from_user(&lum, lump, lum_size)) + if (copy_from_user(&lum, lump, lum_size)) RETURN(-EFAULT); if ((lum.lmm_magic != LOV_USER_MAGIC_V1) && @@ -2911,15 +2946,15 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) } lumk->lmm_oi = lsm->lsm_oi; - lumk->lmm_stripe_count = 1; + lumk->lmm_stripe_count = 1; - if (cfs_copy_to_user(lump, lumk, lum_size)) - rc = -EFAULT; + if (copy_to_user(lump, lumk, lum_size)) + rc = -EFAULT; - if (lumk != &lum) - OBD_FREE(lumk, lum_size); + if (lumk != &lum) + OBD_FREE(lumk, lum_size); - RETURN(rc); + RETURN(rc); } @@ -2931,10 +2966,10 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, int err = 0; ENTRY; - if (!cfs_try_module_get(THIS_MODULE)) { - CERROR("Can't get module. Is it alive?"); - return -EINVAL; - } + if (!try_module_get(THIS_MODULE)) { + CERROR("Can't get module. Is it alive?"); + return -EINVAL; + } switch (cmd) { case OBD_IOC_LOV_GET_CONFIG: { char *buf; @@ -2969,7 +3004,7 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid)); - err = cfs_copy_to_user((void *)uarg, buf, len); + err = copy_to_user((void *)uarg, buf, len); if (err) err = -EFAULT; obd_ioctl_freedata(buf, len); @@ -2999,14 +3034,14 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case OBD_IOC_PING_TARGET: err = ptlrpc_obd_ping(obd); GOTO(out, err); - default: - CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", - cmd, cfs_curproc_comm()); - GOTO(out, err = -ENOTTY); - } + default: + CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", + cmd, current_comm()); + GOTO(out, err = -ENOTTY); + } out: - cfs_module_put(THIS_MODULE); - return err; + module_put(THIS_MODULE); + return err; } static int osc_get_info(const struct lu_env *env, struct obd_export *exp, @@ -3059,15 +3094,52 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, ptlrpc_req_finished(req); RETURN(rc); } else if (KEY_IS(KEY_FIEMAP)) { - struct ptlrpc_request *req; - struct ll_user_fiemap *reply; - char *tmp; - int rc; + struct ll_fiemap_info_key *fm_key = + (struct ll_fiemap_info_key *)key; + struct ldlm_res_id res_id; + ldlm_policy_data_t policy; + struct lustre_handle lockh; + ldlm_mode_t mode = 0; + struct ptlrpc_request *req; + struct ll_user_fiemap *reply; + char *tmp; + int rc; + + if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC)) + goto skip_locking; + + policy.l_extent.start = fm_key->fiemap.fm_start & + CFS_PAGE_MASK; + + if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <= + fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1) + policy.l_extent.end = OBD_OBJECT_EOF; + else + policy.l_extent.end = (fm_key->fiemap.fm_start + + fm_key->fiemap.fm_length + + PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK; + + ostid_build_res_name(&fm_key->oa.o_oi, &res_id); + mode = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED | + LDLM_FL_LVB_READY, + &res_id, LDLM_EXTENT, &policy, + LCK_PR | LCK_PW, &lockh, 0); + if (mode) { /* lock is cached on client */ + if (mode != LCK_PR) { + ldlm_lock_addref(&lockh, LCK_PR); + ldlm_lock_decref(&lockh, LCK_PW); + } + } else { /* no cached lock, needs acquire lock on server side */ + fm_key->oa.o_valid |= OBD_MD_FLFLAGS; + fm_key->oa.o_flags |= OBD_FL_SRVLOCK; + } +skip_locking: req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GET_INFO_FIEMAP); if (req == NULL) - RETURN(-ENOMEM); + GOTO(drop_lock, rc = -ENOMEM); req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY, RCL_CLIENT, keylen); @@ -3079,7 +3151,7 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); if (rc) { ptlrpc_request_free(req); - RETURN(rc); + GOTO(drop_lock, rc); } tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY); @@ -3090,16 +3162,18 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); if (rc) - GOTO(out1, rc); + GOTO(fini_req, rc); reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL); if (reply == NULL) - GOTO(out1, rc = -EPROTO); + GOTO(fini_req, rc = -EPROTO); memcpy(val, reply, *vallen); - out1: +fini_req: ptlrpc_req_finished(req); - +drop_lock: + if (mode) + ldlm_lock_decref(&lockh, LCK_PR); RETURN(rc); } @@ -3637,6 +3711,8 @@ int __init osc_init(void) CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); rc = lu_kmem_init(osc_caches); + if (rc) + RETURN(rc); lprocfs_osc_init_vars(&lvars);