X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=3416fe38e254d79bde957b9693216e806ab75a26;hp=c5bca5ec1a6e1eaecc31cfbc9d00b81d735f9d9d;hb=010cbbd3de5859064dc0bcbb1bb9a76fb6fc8f13;hpb=586e95a5b3f7b9525d78e7efc9f2949387fc9d54 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index c5bca5e..3416fe3 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -46,8 +46,6 @@ #include #include #include -#include -#include #ifdef __CYGWIN__ # include @@ -55,6 +53,7 @@ #include #include +#include #include #include #include @@ -62,41 +61,52 @@ #include "osc_internal.h" #include "osc_cl_internal.h" -static void osc_release_ppga(struct brw_page **ppga, obd_count count); -static int brw_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *data, int rc); -int osc_cleanup(struct obd_device *obd); +struct osc_brw_async_args { + struct obdo *aa_oa; + int aa_requested_nob; + int aa_nio_count; + obd_count aa_page_count; + int aa_resends; + struct brw_page **aa_ppga; + struct client_obd *aa_cli; + struct list_head aa_oaps; + struct list_head aa_exts; + struct obd_capa *aa_ocapa; + struct cl_req *aa_clerq; +}; -/* Pack OSC object metadata for disk storage (LE byte order). */ -static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, - struct lov_stripe_md *lsm) -{ - int lmm_size; - ENTRY; +#define osc_grant_args osc_brw_async_args - lmm_size = sizeof(**lmmp); - if (lmmp == NULL) - RETURN(lmm_size); +struct osc_async_args { + struct obd_info *aa_oi; +}; - if (*lmmp != NULL && lsm == NULL) { - OBD_FREE(*lmmp, lmm_size); - *lmmp = NULL; - RETURN(0); - } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) { - RETURN(-EBADF); - } +struct osc_setattr_args { + struct obdo *sa_oa; + obd_enqueue_update_f sa_upcall; + void *sa_cookie; +}; - if (*lmmp == NULL) { - OBD_ALLOC(*lmmp, lmm_size); - if (*lmmp == NULL) - RETURN(-ENOMEM); - } +struct osc_fsync_args { + struct obd_info *fa_oi; + obd_enqueue_update_f fa_upcall; + void *fa_cookie; +}; - if (lsm) - ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi); +struct osc_enqueue_args { + struct obd_export *oa_exp; + __u64 *oa_flags; + obd_enqueue_update_f oa_upcall; + void *oa_cookie; + struct ost_lvb *oa_lvb; + struct lustre_handle *oa_lockh; + struct ldlm_enqueue_info *oa_ei; + unsigned int oa_agl:1; +}; - RETURN(lmm_size); -} +static void osc_release_ppga(struct brw_page **ppga, obd_count count); +static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, + void *data, int rc); /* Unpack OSC object metadata from disk storage (LE byte order). */ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, @@ -557,17 +567,6 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, RETURN(0); } -static int osc_punch(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo, struct obd_trans_info *oti, - struct ptlrpc_request_set *rqset) -{ - oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start; - oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end; - oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - return osc_punch_base(exp, oinfo, - oinfo->oi_cb_up, oinfo, rqset); -} - static int osc_sync_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *arg, int rc) @@ -636,29 +635,11 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, RETURN (0); } -static int osc_sync(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo, obd_size start, obd_size end, - struct ptlrpc_request_set *set) -{ - ENTRY; - - if (!oinfo->oi_oa) { - CDEBUG(D_INFO, "oa NULL\n"); - RETURN(-EINVAL); - } - - oinfo->oi_oa->o_size = start; - oinfo->oi_oa->o_blocks = end; - oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); - - RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set)); -} - /* Find and cancel locally locks matched by @mode in the resource found by * @objid. Found locks are added into @cancel list. Returns the amount of * locks added to @cancels list. */ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, - cfs_list_t *cancels, + struct list_head *cancels, ldlm_mode_t mode, __u64 lock_flags) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; @@ -695,19 +676,19 @@ static int osc_destroy_interpret(const struct lu_env *env, { struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - cfs_atomic_dec(&cli->cl_destroy_in_flight); + atomic_dec(&cli->cl_destroy_in_flight); wake_up(&cli->cl_destroy_waitq); return 0; } static int osc_can_send_destroy(struct client_obd *cli) { - if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <= + if (atomic_inc_return(&cli->cl_destroy_in_flight) <= cli->cl_max_rpcs_in_flight) { /* The destroy request can be sent */ return 1; } - if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) < + if (atomic_dec_return(&cli->cl_destroy_in_flight) < cli->cl_max_rpcs_in_flight) { /* * The counter has been modified between the two atomic @@ -761,7 +742,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; struct ost_body *body; - CFS_LIST_HEAD(cancels); + struct list_head cancels = LIST_HEAD_INIT(cancels); int rc, count; ENTRY; @@ -838,18 +819,18 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, CERROR("dirty %lu - %lu > dirty_max %lu\n", cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); oa->o_undirty = 0; - } else if (unlikely(cfs_atomic_read(&obd_unstable_pages) + - cfs_atomic_read(&obd_dirty_pages) - - cfs_atomic_read(&obd_dirty_transit_pages) > + } else if (unlikely(atomic_read(&obd_unstable_pages) + + atomic_read(&obd_dirty_pages) - + atomic_read(&obd_dirty_transit_pages) > (long)(obd_max_dirty_pages + 1))) { - /* The cfs_atomic_read() allowing the cfs_atomic_inc() are + /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n", cli->cl_import->imp_obd->obd_name, - cfs_atomic_read(&obd_unstable_pages), - cfs_atomic_read(&obd_dirty_pages), - cfs_atomic_read(&obd_dirty_transit_pages), + atomic_read(&obd_unstable_pages), + atomic_read(&obd_dirty_pages), + atomic_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) { @@ -1021,14 +1002,13 @@ static int osc_should_shrink_grant(struct client_obd *client) static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data) { - struct client_obd *client; + struct client_obd *client; - cfs_list_for_each_entry(client, &item->ti_obd_list, - cl_grant_shrink_list) { - if (osc_should_shrink_grant(client)) - osc_shrink_grant(client); - } - return 0; + list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) { + if (osc_should_shrink_grant(client)) + osc_shrink_grant(client); + } + return 0; } static int osc_add_shrink_grant(struct client_obd *client) @@ -1090,7 +1070,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits); if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && - cfs_list_empty(&cli->cl_grant_shrink_list)) + list_empty(&cli->cl_grant_shrink_list)) osc_add_shrink_grant(cli); } @@ -1172,8 +1152,9 @@ static int check_write_rcs(struct ptlrpc_request *req, static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) { if (p1->flag != p2->flag) { - unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE| - OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA); + unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE | + OBD_BRW_SYNC | OBD_BRW_ASYNC | + OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC); /* warn if we try to combine flags that we don't know to be * safe to combine */ @@ -1441,7 +1422,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, aa->aa_resends = 0; aa->aa_ppga = pga; aa->aa_cli = cli; - CFS_INIT_LIST_HEAD(&aa->aa_oaps); + INIT_LIST_HEAD(&aa->aa_oaps); if (ocapa && reserve) aa->aa_ocapa = capa_get(ocapa); @@ -1604,12 +1585,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) router = libcfs_nid2str(req->rq_bulk->bd_sender); } - if (server_cksum == ~0 && rc > 0) { - CERROR("Protocol error: server %s set the 'checksum' " - "bit, but didn't send a checksum. Not fatal, " - "but please notify on http://bugs.whamcloud.com/\n", - libcfs_nid2str(peer->nid)); - } else if (server_cksum != client_cksum) { + if (server_cksum != client_cksum) { LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " "%s%s%s inode "DFID" object "DOSTID " extent ["LPU64"-"LPU64"]\n", @@ -1655,76 +1631,6 @@ out: RETURN(rc); } -static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, - obd_count page_count, struct brw_page **pga, - struct obd_capa *ocapa) -{ - struct ptlrpc_request *req; - int rc; - wait_queue_head_t waitq; - int generation, resends = 0; - struct l_wait_info lwi; - - ENTRY; - - init_waitqueue_head(&waitq); - generation = exp->exp_obd->u.cli.cl_import->imp_generation; - -restart_bulk: - rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm, - page_count, pga, &req, ocapa, 0, resends); - if (rc != 0) - return (rc); - - if (resends) { - req->rq_generation_set = 1; - req->rq_import_generation = generation; - req->rq_sent = cfs_time_current_sec() + resends; - } - - rc = ptlrpc_queue_wait(req); - - if (rc == -ETIMEDOUT && req->rq_resend) { - DEBUG_REQ(D_HA, req, "BULK TIMEOUT"); - ptlrpc_req_finished(req); - goto restart_bulk; - } - - rc = osc_brw_fini_request(req, rc); - - ptlrpc_req_finished(req); - /* When server return -EINPROGRESS, client should always retry - * regardless of the number of times the bulk was resent already.*/ - if (osc_recoverable_error(rc)) { - resends++; - if (rc != -EINPROGRESS && - !client_should_resend(resends, &exp->exp_obd->u.cli)) { - CERROR("%s: too many resend retries for object: " - ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name, - POSTID(&oa->o_oi), rc); - goto out; - } - if (generation != - exp->exp_obd->u.cli.cl_import->imp_generation) { - CDEBUG(D_HA, "%s: resend cross eviction for object: " - ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name, - POSTID(&oa->o_oi), rc); - goto out; - } - - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, - NULL); - l_wait_event(waitq, 0, &lwi); - - goto restart_bulk; - } -out: - if (rc == -EAGAIN || rc == -EINPROGRESS) - rc = -EIO; - RETURN (rc); -} - static int osc_brw_redo_request(struct ptlrpc_request *request, struct osc_brw_async_args *aa, int rc) { @@ -1745,7 +1651,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, if (rc) RETURN(rc); - cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { + list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { if (oap->oap_request != NULL) { LASSERTF(request == oap->oap_request, "request %p != oap_request %p\n", @@ -1773,13 +1679,13 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, new_aa = ptlrpc_req_async_args(new_req); - CFS_INIT_LIST_HEAD(&new_aa->aa_oaps); - cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); - CFS_INIT_LIST_HEAD(&new_aa->aa_exts); - cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts); + INIT_LIST_HEAD(&new_aa->aa_oaps); + list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); + INIT_LIST_HEAD(&new_aa->aa_exts); + list_splice_init(&aa->aa_exts, &new_aa->aa_exts); new_aa->aa_resends = aa->aa_resends; - cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { + list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { if (oap->oap_request) { ptlrpc_req_finished(oap->oap_request); oap->oap_request = ptlrpc_request_addref(new_req); @@ -1830,133 +1736,18 @@ static void sort_brw_pages(struct brw_page **array, int num) } while (stride > 1); } -static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages) -{ - int count = 1; - int offset; - int i = 0; - - LASSERT (pages > 0); - offset = pg[i]->off & ~CFS_PAGE_MASK; - - for (;;) { - pages--; - if (pages == 0) /* that's all */ - return count; - - if (offset + pg[i]->count < PAGE_CACHE_SIZE) - return count; /* doesn't end on page boundary */ - - i++; - offset = pg[i]->off & ~CFS_PAGE_MASK; - if (offset != 0) /* doesn't start on page boundary */ - return count; - - count++; - } -} - -static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count) -{ - struct brw_page **ppga; - int i; - - OBD_ALLOC(ppga, sizeof(*ppga) * count); - if (ppga == NULL) - return NULL; - - for (i = 0; i < count; i++) - ppga[i] = pga + i; - return ppga; -} - static void osc_release_ppga(struct brw_page **ppga, obd_count count) { LASSERT(ppga != NULL); OBD_FREE(ppga, sizeof(*ppga) * count); } -static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, - obd_count page_count, struct brw_page *pga, - struct obd_trans_info *oti) -{ - struct obdo *saved_oa = NULL; - struct brw_page **ppga, **orig; - struct obd_import *imp = class_exp2cliimp(exp); - struct client_obd *cli; - int rc, page_count_orig; - ENTRY; - - LASSERT((imp != NULL) && (imp->imp_obd != NULL)); - cli = &imp->imp_obd->u.cli; - - if (cmd & OBD_BRW_CHECK) { - /* The caller just wants to know if there's a chance that this - * I/O can succeed */ - - if (imp->imp_invalid) - RETURN(-EIO); - RETURN(0); - } - - /* test_brw with a failed create can trip this, maybe others. */ - LASSERT(cli->cl_max_pages_per_rpc); - - rc = 0; - - orig = ppga = osc_build_ppga(pga, page_count); - if (ppga == NULL) - RETURN(-ENOMEM); - page_count_orig = page_count; - - sort_brw_pages(ppga, page_count); - while (page_count) { - obd_count pages_per_brw; - - if (page_count > cli->cl_max_pages_per_rpc) - pages_per_brw = cli->cl_max_pages_per_rpc; - else - pages_per_brw = page_count; - - pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw); - - if (saved_oa != NULL) { - /* restore previously saved oa */ - *oinfo->oi_oa = *saved_oa; - } else if (page_count > pages_per_brw) { - /* save a copy of oa (brw will clobber it) */ - OBDO_ALLOC(saved_oa); - if (saved_oa == NULL) - GOTO(out, rc = -ENOMEM); - *saved_oa = *oinfo->oi_oa; - } - - rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md, - pages_per_brw, ppga, oinfo->oi_capa); - - if (rc != 0) - break; - - page_count -= pages_per_brw; - ppga += pages_per_brw; - } - -out: - osc_release_ppga(orig, page_count_orig); - - if (saved_oa != NULL) - OBDO_FREE(saved_oa); - - RETURN(rc); -} - static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { struct osc_brw_async_args *aa = data; struct osc_extent *ext; struct osc_extent *tmp; - struct cl_object *obj = NULL; struct client_obd *cli = aa->aa_cli; ENTRY; @@ -1992,24 +1783,17 @@ static int brw_interpret(const struct lu_env *env, aa->aa_ocapa = NULL; } - cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { - if (obj == NULL && rc == 0) { - obj = osc2cl(ext->oe_obj); - cl_object_get(obj); - } - - cfs_list_del_init(&ext->oe_link); - osc_extent_finish(env, ext, 1, rc); - } - LASSERT(cfs_list_empty(&aa->aa_exts)); - LASSERT(cfs_list_empty(&aa->aa_oaps)); - - if (obj != NULL) { + if (rc == 0) { struct obdo *oa = aa->aa_oa; - struct cl_attr *attr = &osc_env_info(env)->oti_attr; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; unsigned long valid = 0; + struct cl_object *obj; + struct osc_async_page *last; - LASSERT(rc == 0); + last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]); + obj = osc2cl(last->oap_obj); + + cl_object_attr_lock(obj); if (oa->o_valid & OBD_MD_FLBLOCKS) { attr->cat_blocks = oa->o_blocks; valid |= CAT_BLOCKS; @@ -2026,15 +1810,38 @@ static int brw_interpret(const struct lu_env *env, attr->cat_ctime = oa->o_ctime; valid |= CAT_CTIME; } - if (valid != 0) { - cl_object_attr_lock(obj); - cl_object_attr_set(env, obj, attr, valid); - cl_object_attr_unlock(obj); + + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { + struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; + loff_t last_off = last->oap_count + last->oap_obj_off; + + /* Change file size if this is an out of quota or + * direct IO write and it extends the file size */ + if (loi->loi_lvb.lvb_size < last_off) { + attr->cat_size = last_off; + valid |= CAT_SIZE; + } + /* Extend KMS if it's not a lockless write */ + if (loi->loi_kms < last_off && + oap2osc_page(last)->ops_srvlock == 0) { + attr->cat_kms = last_off; + valid |= CAT_KMS; + } } - cl_object_put(env, obj); + + if (valid != 0) + cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_unlock(obj); } OBDO_FREE(aa->aa_oa); + list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { + list_del_init(&ext->oe_link); + osc_extent_finish(env, ext, 1, rc); + } + LASSERT(list_empty(&aa->aa_exts)); + LASSERT(list_empty(&aa->aa_oaps)); + cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : req->rq_bulk->bd_nob_transferred); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); @@ -2057,19 +1864,20 @@ static int brw_interpret(const struct lu_env *env, static void brw_commit(struct ptlrpc_request *req) { - spin_lock(&req->rq_lock); /* If osc_inc_unstable_pages (via osc_extent_finish) races with * this called via the rq_commit_cb, I need to ensure * osc_dec_unstable_pages is still called. Otherwise unstable * pages may be leaked. */ - if (req->rq_unstable) { + spin_lock(&req->rq_lock); + if (likely(req->rq_unstable)) { + req->rq_unstable = 0; spin_unlock(&req->rq_lock); + osc_dec_unstable_pages(req); - spin_lock(&req->rq_lock); } else { req->rq_committed = 1; + spin_unlock(&req->rq_lock); } - spin_unlock(&req->rq_lock); } /** @@ -2078,7 +1886,7 @@ static void brw_commit(struct ptlrpc_request *req) * Extents in the list must be in OES_RPC state. */ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, - cfs_list_t *ext_list, int cmd, pdl_policy_t pol) + struct list_head *ext_list, int cmd, pdl_policy_t pol) { struct ptlrpc_request *req = NULL; struct osc_extent *ext; @@ -2099,18 +1907,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, int page_count = 0; int i; int rc; - CFS_LIST_HEAD(rpc_list); + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); ENTRY; - LASSERT(!cfs_list_empty(ext_list)); + LASSERT(!list_empty(ext_list)); /* add pages into rpc_list to build BRW rpc */ - cfs_list_for_each_entry(ext, ext_list, oe_link) { + list_for_each_entry(ext, ext_list, oe_link) { LASSERT(ext->oe_state == OES_RPC); mem_tight |= ext->oe_memalloc; - cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { + list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { ++page_count; - cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); + list_add_tail(&oap->oap_rpc_item, &rpc_list); if (starting_offset > oap->oap_obj_off) starting_offset = oap->oap_obj_off; else @@ -2140,7 +1948,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, GOTO(out, rc = -ENOMEM); i = 0; - cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) { + list_for_each_entry(oap, &rpc_list, oap_rpc_item) { struct cl_page *page = oap2cl_page(oap); if (clerq == NULL) { clerq = cl_req_alloc(env, page, crt, @@ -2201,16 +2009,16 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); aa = ptlrpc_req_async_args(req); - CFS_INIT_LIST_HEAD(&aa->aa_oaps); - cfs_list_splice_init(&rpc_list, &aa->aa_oaps); - CFS_INIT_LIST_HEAD(&aa->aa_exts); - cfs_list_splice_init(ext_list, &aa->aa_exts); + INIT_LIST_HEAD(&aa->aa_oaps); + list_splice_init(&rpc_list, &aa->aa_oaps); + INIT_LIST_HEAD(&aa->aa_exts); + list_splice_init(ext_list, &aa->aa_exts); aa->aa_clerq = clerq; /* queued sync pages can be torn down while the pages * were between the pending list and the rpc */ tmp = NULL; - cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { + list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { /* only one oap gets a request reference */ if (tmp == NULL) tmp = oap; @@ -2278,10 +2086,10 @@ out: OBD_FREE(pga, sizeof(*pga) * page_count); /* this should happen rarely and is pretty bad, it makes the * pending list not follow the dirty order */ - while (!cfs_list_empty(ext_list)) { - ext = cfs_list_entry(ext_list->next, struct osc_extent, - oe_link); - cfs_list_del_init(&ext->oe_link); + while (!list_empty(ext_list)) { + ext = list_entry(ext_list->next, struct osc_extent, + oe_link); + list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 0, rc); } if (clerq && !IS_ERR(clerq)) @@ -2422,6 +2230,9 @@ static int osc_enqueue_interpret(const struct lu_env *env, * osc_enqueue_fini(). */ ldlm_lock_addref(&handle, mode); + /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2); + /* Let CP AST to grant the lock first. */ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); @@ -2458,51 +2269,6 @@ static int osc_enqueue_interpret(const struct lu_env *env, return rc; } -void osc_update_enqueue(struct lustre_handle *lov_lockhp, - struct lov_oinfo *loi, __u64 flags, - struct ost_lvb *lvb, __u32 mode, int rc) -{ - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); - - if (rc == ELDLM_OK) { - __u64 tmp; - - LASSERT(lock != NULL); - loi->loi_lvb = *lvb; - tmp = loi->loi_lvb.lvb_size; - /* Extend KMS up to the end of this lock and no further - * A lock on [x,y] means a KMS of up to y + 1 bytes! */ - if (tmp > lock->l_policy_data.l_extent.end) - tmp = lock->l_policy_data.l_extent.end + 1; - if (tmp >= loi->loi_kms) { - LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64 - ", kms="LPU64, loi->loi_lvb.lvb_size, tmp); - loi_kms_set(loi, tmp); - } else { - LDLM_DEBUG(lock, "lock acquired, setting rss=" - LPU64"; leaving kms="LPU64", end="LPU64, - loi->loi_lvb.lvb_size, loi->loi_kms, - lock->l_policy_data.l_extent.end); - } - ldlm_lock_allow_match(lock); - } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) { - LASSERT(lock != NULL); - loi->loi_lvb = *lvb; - ldlm_lock_allow_match(lock); - CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" - " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); - rc = ELDLM_OK; - } - - if (lock != NULL) { - if (rc != ELDLM_OK) - ldlm_lock_fail_match(lock); - - LDLM_LOCK_PUT(lock); - } -} -EXPORT_SYMBOL(osc_update_enqueue); - struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock @@ -2602,14 +2368,13 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, no_match: if (intent) { - CFS_LIST_HEAD(cancels); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_ENQUEUE_LVB); - if (req == NULL) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_ENQUEUE_LVB); + if (req == NULL) + RETURN(-ENOMEM); - rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0); - if (rc) { + rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE); + if (rc < 0) { ptlrpc_request_free(req); RETURN(rc); } @@ -2657,23 +2422,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } -static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, - struct ldlm_enqueue_info *einfo, - struct ptlrpc_request_set *rqset) -{ - struct ldlm_res_id res_id; - int rc; - ENTRY; - - ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id); - rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy, - &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, - oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid, - oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh, - rqset, rqset != NULL, 0); - RETURN(rc); -} - int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u32 type, ldlm_policy_data_t *policy, __u32 mode, __u64 *flags, void *data, struct lustre_handle *lockh, @@ -2730,29 +2478,6 @@ int osc_cancel_base(struct lustre_handle *lockh, __u32 mode) RETURN(0); } -static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md, - __u32 mode, struct lustre_handle *lockh) -{ - ENTRY; - RETURN(osc_cancel_base(lockh, mode)); -} - -static int osc_cancel_unused(struct obd_export *exp, - struct lov_stripe_md *lsm, - ldlm_cancel_flags_t flags, - void *opaque) -{ - struct obd_device *obd = class_exp2obd(exp); - struct ldlm_res_id res_id, *resp = NULL; - - if (lsm != NULL) { - ostid_build_res_name(&lsm->lsm_oi, &res_id); - resp = &res_id; - } - - return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque); -} - static int osc_statfs_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct osc_async_args *aa, int rc) @@ -2894,70 +2619,6 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, return rc; } -/* Retrieve object striping information. - * - * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating - * the maximum number of OST indices which will fit in the user buffer. - * lmm_magic must be LOV_MAGIC (we only use 1 slot here). - */ -static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) -{ - /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */ - struct lov_user_md_v3 lum, *lumk; - struct lov_user_ost_data_v1 *lmm_objects; - int rc = 0, lum_size; - ENTRY; - - if (!lsm) - RETURN(-ENODATA); - - /* we only need the header part from user space to get lmm_magic and - * lmm_stripe_count, (the header part is common to v1 and v3) */ - lum_size = sizeof(struct lov_user_md_v1); - if (copy_from_user(&lum, lump, lum_size)) - RETURN(-EFAULT); - - if ((lum.lmm_magic != LOV_USER_MAGIC_V1) && - (lum.lmm_magic != LOV_USER_MAGIC_V3)) - RETURN(-EINVAL); - - /* lov_user_md_vX and lov_mds_md_vX must have the same size */ - LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1)); - LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3)); - LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0])); - - /* we can use lov_mds_md_size() to compute lum_size - * because lov_user_md_vX and lov_mds_md_vX have the same size */ - if (lum.lmm_stripe_count > 0) { - lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic); - OBD_ALLOC(lumk, lum_size); - if (!lumk) - RETURN(-ENOMEM); - - if (lum.lmm_magic == LOV_USER_MAGIC_V1) - lmm_objects = - &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]); - else - lmm_objects = &(lumk->lmm_objects[0]); - lmm_objects->l_ost_oi = lsm->lsm_oi; - } else { - lum_size = lov_mds_md_size(0, lum.lmm_magic); - lumk = &lum; - } - - lumk->lmm_oi = lsm->lsm_oi; - lumk->lmm_stripe_count = 1; - - if (copy_to_user(lump, lumk, lum_size)) - rc = -EFAULT; - - if (lumk != &lum) - OBD_FREE(lumk, lum_size); - - RETURN(rc); -} - - static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { @@ -2971,53 +2632,6 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, return -EINVAL; } switch (cmd) { - case OBD_IOC_LOV_GET_CONFIG: { - char *buf; - struct lov_desc *desc; - struct obd_uuid uuid; - - buf = NULL; - len = 0; - if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) - GOTO(out, err = -EINVAL); - - data = (struct obd_ioctl_data *)buf; - - if (sizeof(*desc) > data->ioc_inllen1) { - obd_ioctl_freedata(buf, len); - GOTO(out, err = -EINVAL); - } - - if (data->ioc_inllen2 < sizeof(uuid)) { - obd_ioctl_freedata(buf, len); - GOTO(out, err = -EINVAL); - } - - desc = (struct lov_desc *)data->ioc_inlbuf1; - desc->ld_tgt_count = 1; - desc->ld_active_tgt_count = 1; - desc->ld_default_stripe_count = 1; - desc->ld_default_stripe_size = 0; - desc->ld_default_stripe_offset = 0; - desc->ld_pattern = 0; - memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid)); - - memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid)); - - err = copy_to_user((void *)uarg, buf, len); - if (err) - err = -EFAULT; - obd_ioctl_freedata(buf, len); - GOTO(out, err); - } - case LL_IOC_LOV_SETSTRIPE: - err = obd_alloc_memmd(exp, karg); - if (err > 0) - err = 0; - GOTO(out, err); - case LL_IOC_LOV_GETSTRIPE: - err = osc_getstripe(karg, uarg); - GOTO(out, err); case OBD_IOC_CLIENT_RECOVER: err = ptlrpc_recover_import(obd->u.cli.cl_import, data->ioc_inlbuf1, 0); @@ -3215,13 +2829,13 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, LASSERT(cli->cl_cache == NULL); /* only once */ cli->cl_cache = (struct cl_client_cache *)val; - cfs_atomic_inc(&cli->cl_cache->ccc_users); + atomic_inc(&cli->cl_cache->ccc_users); cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; /* add this osc into entity list */ - LASSERT(cfs_list_empty(&cli->cl_lru_osc)); + LASSERT(list_empty(&cli->cl_lru_osc)); spin_lock(&cli->cl_cache->ccc_lru_lock); - cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru); + list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru); spin_unlock(&cli->cl_cache->ccc_lru_lock); RETURN(0); @@ -3229,10 +2843,10 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { struct client_obd *cli = &obd->u.cli; - int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1; + int nr = atomic_read(&cli->cl_lru_in_list) >> 1; int target = *(int *)val; - nr = osc_lru_shrink(cli, min(nr, target)); + nr = osc_lru_shrink(env, cli, min(nr, target), true); *(int *)val -= nr; RETURN(0); } @@ -3298,34 +2912,6 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(0); } - -static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, - struct obd_device *disk_obd, int *index) -{ - /* this code is not supposed to be used with LOD/OSP - * to be removed soon */ - LBUG(); - return 0; -} - -static int osc_llog_finish(struct obd_device *obd, int count) -{ - struct llog_ctxt *ctxt; - - ENTRY; - - ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); - if (ctxt) { - llog_cat_close(NULL, ctxt->loc_handle); - llog_cleanup(NULL, ctxt); - } - - ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); - if (ctxt) - llog_cleanup(NULL, ctxt); - RETURN(0); -} - static int osc_reconnect(const struct lu_env *env, struct obd_export *exp, struct obd_device *obd, struct obd_uuid *cluuid, @@ -3475,23 +3061,17 @@ static int osc_import_event(struct obd_device *obd, * \retval zero the lock can't be canceled * \retval other ok to cancel */ -static int osc_cancel_for_recovery(struct ldlm_lock *lock) +static int osc_cancel_weight(struct ldlm_lock *lock) { - check_res_locked(lock->l_resource); - - /* - * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR. - * - * XXX as a future improvement, we can also cancel unused write lock - * if it doesn't have dirty data and active mmaps. - */ - if (lock->l_resource->lr_type == LDLM_EXTENT && - (lock->l_granted_mode == LCK_PR || - lock->l_granted_mode == LCK_CR) && - (osc_dlm_lock_pageref(lock) == 0)) - RETURN(1); + /* + * Cancel all unused and granted extent lock. + */ + if (lock->l_resource->lr_type == LDLM_EXTENT && + lock->l_granted_mode == lock->l_req_mode && + osc_ldlm_weigh_ast(lock) == 0) + RETURN(1); - RETURN(0); + RETURN(0); } static int brw_queue_work(const struct lu_env *env, void *data) @@ -3506,10 +3086,10 @@ static int brw_queue_work(const struct lu_env *env, void *data) int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { - struct lprocfs_static_vars lvars = { 0 }; - struct client_obd *cli = &obd->u.cli; - void *handler; - int rc; + struct client_obd *cli = &obd->u.cli; + struct obd_type *type; + void *handler; + int rc; ENTRY; rc = ptlrpcd_addref(); @@ -3525,13 +3105,42 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) GOTO(out_client_setup, rc = PTR_ERR(handler)); cli->cl_writeback_work = handler; + handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli); + if (IS_ERR(handler)) + GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler)); + cli->cl_lru_work = handler; + rc = osc_quota_setup(obd); if (rc) GOTO(out_ptlrpcd_work, rc); cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; - lprocfs_osc_init_vars(&lvars); - if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { + +#ifdef LPROCFS + obd->obd_vars = lprocfs_osc_obd_vars; +#endif + /* If this is true then both client (osc) and server (osp) are on the + * same node. The osp layer if loaded first will register the osc proc + * directory. In that case this obd_device will be attached its proc + * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */ + type = class_search_type(LUSTRE_OSP_NAME); + if (type && type->typ_procsym) { + obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name, + type->typ_procsym, + obd->obd_vars, obd); + if (IS_ERR(obd->obd_proc_entry)) { + rc = PTR_ERR(obd->obd_proc_entry); + CERROR("error %d setting up lprocfs for %s\n", rc, + obd->obd_name); + obd->obd_proc_entry = NULL; + } + } else { + rc = lprocfs_seq_obd_setup(obd); + } + + /* If the basic OSC proc tree construction succeeded then + * lets do the rest. */ + if (rc == 0) { lproc_osc_attach_seqstat(obd); sptlrpc_lprocfs_cliobd_attach(obd); ptlrpc_lprocfs_register_obd(obd); @@ -3547,12 +3156,19 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) OST_MAXREQSIZE, ptlrpc_add_rqs_to_pool); - CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list); - ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); - RETURN(rc); + INIT_LIST_HEAD(&cli->cl_grant_shrink_list); + ns_register_cancel(obd->obd_namespace, osc_cancel_weight); + RETURN(0); out_ptlrpcd_work: - ptlrpcd_destroy_work(handler); + if (cli->cl_writeback_work != NULL) { + ptlrpcd_destroy_work(cli->cl_writeback_work); + cli->cl_writeback_work = NULL; + } + if (cli->cl_lru_work != NULL) { + ptlrpcd_destroy_work(cli->cl_lru_work); + cli->cl_lru_work = NULL; + } out_client_setup: client_obd_cleanup(obd); out_ptlrpcd: @@ -3593,6 +3209,10 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) ptlrpcd_destroy_work(cli->cl_writeback_work); cli->cl_writeback_work = NULL; } + if (cli->cl_lru_work) { + ptlrpcd_destroy_work(cli->cl_lru_work); + cli->cl_lru_work = NULL; + } obd_cleanup_client_import(obd); ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); @@ -3614,12 +3234,12 @@ int osc_cleanup(struct obd_device *obd) /* lru cleanup */ if (cli->cl_cache != NULL) { - LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0); + LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0); spin_lock(&cli->cl_cache->ccc_lru_lock); - cfs_list_del_init(&cli->cl_lru_osc); + list_del_init(&cli->cl_lru_osc); spin_unlock(&cli->cl_cache->ccc_lru_lock); cli->cl_lru_left = NULL; - cfs_atomic_dec(&cli->cl_cache->ccc_users); + atomic_dec(&cli->cl_cache->ccc_users); cli->cl_cache = NULL; } @@ -3634,21 +3254,9 @@ int osc_cleanup(struct obd_device *obd) int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) { - struct lprocfs_static_vars lvars = { 0 }; - int rc = 0; - - lprocfs_osc_init_vars(&lvars); - - switch (lcfg->lcfg_command) { - default: - rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, - lcfg, obd); - if (rc > 0) - rc = 0; - break; - } - - return(rc); + int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars, + lcfg, obd); + return rc > 0 ? 0: rc; } static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) @@ -3668,7 +3276,6 @@ struct obd_ops osc_obd_ops = { .o_disconnect = osc_disconnect, .o_statfs = osc_statfs, .o_statfs_async = osc_statfs_async, - .o_packmd = osc_packmd, .o_unpackmd = osc_unpackmd, .o_create = osc_create, .o_destroy = osc_destroy, @@ -3676,20 +3283,12 @@ struct obd_ops osc_obd_ops = { .o_getattr_async = osc_getattr_async, .o_setattr = osc_setattr, .o_setattr_async = osc_setattr_async, - .o_brw = osc_brw, - .o_punch = osc_punch, - .o_sync = osc_sync, - .o_enqueue = osc_enqueue, .o_change_cbdata = osc_change_cbdata, .o_find_cbdata = osc_find_cbdata, - .o_cancel = osc_cancel, - .o_cancel_unused = osc_cancel_unused, .o_iocontrol = osc_iocontrol, .o_get_info = osc_get_info, .o_set_info_async = osc_set_info_async, .o_import_event = osc_import_event, - .o_llog_init = osc_llog_init, - .o_llog_finish = osc_llog_finish, .o_process_config = osc_process_config, .o_quotactl = osc_quotactl, .o_quotacheck = osc_quotacheck, @@ -3701,9 +3300,10 @@ extern struct lock_class_key osc_ast_guard_class; int __init osc_init(void) { - struct lprocfs_static_vars lvars = { 0 }; - int rc; - ENTRY; + bool enable_proc = true; + struct obd_type *type; + int rc; + ENTRY; /* print an address of _any_ initialized kernel symbol from this * module, to allow debugging with gdb that doesn't support data @@ -3714,10 +3314,15 @@ int __init osc_init(void) if (rc) RETURN(rc); - lprocfs_osc_init_vars(&lvars); + type = class_search_type(LUSTRE_OSP_NAME); + if (type != NULL && type->typ_procsym != NULL) + enable_proc = false; - rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars, - LUSTRE_OSC_NAME, &osc_device_type); + rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL, +#ifndef HAVE_ONLY_PROCFS_SEQ + NULL, +#endif + LUSTRE_OSC_NAME, &osc_device_type); if (rc) { lu_kmem_fini(osc_caches); RETURN(rc);