X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdecho%2Fecho_client.c;h=6905ce687e6681a71a984146d51c5dfcadd0cd46;hb=6fc066434a3b8d4cfee0e038eb5da1cf4c8b5c74;hp=2c21917ac99e8703a4ee7d94fe86c7b7034a1c32;hpb=8ee22413bbcab1ac89758ac2dedbcfd696e75205;p=fs%2Flustre-release.git diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 2c21917..6905ce6 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -47,8 +47,10 @@ #include #include #include +#include #include #include +#include #include #include "echo_internal.h" @@ -74,9 +76,9 @@ struct echo_object { struct cl_object_header eo_hdr; struct echo_device *eo_dev; - cfs_list_t eo_obj_chain; + struct list_head eo_obj_chain; struct lov_stripe_md *eo_lsm; - cfs_atomic_t eo_npages; + atomic_t eo_npages; int eo_deleted; }; @@ -86,29 +88,18 @@ struct echo_object_conf { }; struct echo_page { - struct cl_page_slice ep_cl; - cfs_mutex_t ep_lock; - cfs_page_t *ep_vmpage; + struct cl_page_slice ep_cl; + struct mutex ep_lock; }; struct echo_lock { - struct cl_lock_slice el_cl; - cfs_list_t el_chain; - struct echo_object *el_object; - __u64 el_cookie; - cfs_atomic_t el_refcount; + struct cl_lock_slice el_cl; + struct list_head el_chain; + struct echo_object *el_object; + __u64 el_cookie; + atomic_t el_refcount; }; -struct echo_io { - struct cl_io_slice ei_cl; -}; - -#if 0 -struct echo_req { - struct cl_req_slice er_cl; -}; -#endif - static int echo_client_setup(const struct lu_env *env, struct obd_device *obddev, struct lustre_cfg *lcfg); @@ -173,43 +164,36 @@ struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c) return container_of(c, struct echo_object_conf, eoc_cl); } -static inline void lsm2fid(struct lov_stripe_md *lsm, struct lu_fid *fid) -{ - fid_zero(fid); - fid->f_seq = FID_SEQ_ECHO; - /* truncated to 32 bits by assignment */ - fid->f_oid = lsm->lsm_object_id; - fid->f_ver = lsm->lsm_object_id >> 32; -} /** @} echo_helpers */ static struct echo_object *cl_echo_object_find(struct echo_device *d, struct lov_stripe_md **lsm); static int cl_echo_object_put(struct echo_object *eco); -static int cl_echo_enqueue (struct echo_object *eco, obd_off start, - obd_off end, int mode, __u64 *cookie); -static int cl_echo_cancel (struct echo_device *d, __u64 cookie); static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, - cfs_page_t **pages, int npages, int async); - -static struct echo_thread_info *echo_env_info(const struct lu_env *env); + struct page **pages, int npages, int async); struct echo_thread_info { - struct echo_object_conf eti_conf; - struct lustre_md eti_md; - - struct cl_2queue eti_queue; - struct cl_io eti_io; - struct cl_lock_descr eti_descr; - struct lu_fid eti_fid; - struct md_op_spec eti_spec; - struct lov_mds_md_v3 eti_lmm; - struct lov_user_md_v3 eti_lum; - struct md_attr eti_ma; - struct lu_name eti_lname; - char eti_name[20]; - struct lu_buf eti_buf; - char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE]; + struct echo_object_conf eti_conf; + struct lustre_md eti_md; + + struct cl_2queue eti_queue; + struct cl_io eti_io; + struct cl_lock_descr eti_descr; + struct lu_fid eti_fid; + struct lu_fid eti_fid2; +#ifdef HAVE_SERVER_SUPPORT + struct md_op_spec eti_spec; + struct lov_mds_md_v3 eti_lmm; + struct lov_user_md_v3 eti_lum; + struct md_attr eti_ma; + struct lu_name eti_lname; + /* per-thread values, can be re-used */ + void *eti_big_lmm; + int eti_big_lmmsize; + char eti_name[20]; + struct lu_buf eti_buf; + char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE]; +#endif }; /* No session used right now */ @@ -217,20 +201,14 @@ struct echo_session_info { unsigned long dummy; }; -static cfs_mem_cache_t *echo_page_kmem; -static cfs_mem_cache_t *echo_lock_kmem; -static cfs_mem_cache_t *echo_object_kmem; -static cfs_mem_cache_t *echo_thread_kmem; -static cfs_mem_cache_t *echo_session_kmem; -//static cfs_mem_cache_t *echo_req_kmem; +static struct kmem_cache *echo_lock_kmem; +static struct kmem_cache *echo_object_kmem; +static struct kmem_cache *echo_thread_kmem; +static struct kmem_cache *echo_session_kmem; +/* static struct kmem_cache *echo_req_kmem; */ static struct lu_kmem_descr echo_caches[] = { { - .ckd_cache = &echo_page_kmem, - .ckd_name = "echo_page_kmem", - .ckd_size = sizeof (struct echo_page) - }, - { .ckd_cache = &echo_lock_kmem, .ckd_name = "echo_lock_kmem", .ckd_size = sizeof (struct echo_lock) @@ -250,13 +228,6 @@ static struct lu_kmem_descr echo_caches[] = { .ckd_name = "echo_session_kmem", .ckd_size = sizeof (struct echo_session_info) }, -#if 0 - { - .ckd_cache = &echo_req_kmem, - .ckd_name = "echo_req_kmem", - .ckd_size = sizeof (struct echo_req) - }, -#endif { .ckd_cache = NULL } @@ -268,12 +239,6 @@ static struct lu_kmem_descr echo_caches[] = { * * @{ */ -static cfs_page_t *echo_page_vmpage(const struct lu_env *env, - const struct cl_page_slice *slice) -{ - return cl2echo_page(slice)->ep_vmpage; -} - static int echo_page_own(const struct lu_env *env, const struct cl_page_slice *slice, struct cl_io *io, int nonblock) @@ -281,8 +246,8 @@ static int echo_page_own(const struct lu_env *env, struct echo_page *ep = cl2echo_page(slice); if (!nonblock) - cfs_mutex_lock(&ep->ep_lock); - else if (!cfs_mutex_trylock(&ep->ep_lock)) + mutex_lock(&ep->ep_lock); + else if (!mutex_trylock(&ep->ep_lock)) return -EAGAIN; return 0; } @@ -293,8 +258,8 @@ static void echo_page_disown(const struct lu_env *env, { struct echo_page *ep = cl2echo_page(slice); - LASSERT(cfs_mutex_is_locked(&ep->ep_lock)); - cfs_mutex_unlock(&ep->ep_lock); + LASSERT(mutex_is_locked(&ep->ep_lock)); + mutex_unlock(&ep->ep_lock); } static void echo_page_discard(const struct lu_env *env, @@ -307,7 +272,7 @@ static void echo_page_discard(const struct lu_env *env, static int echo_page_is_vmlocked(const struct lu_env *env, const struct cl_page_slice *slice) { - if (cfs_mutex_is_locked(&cl2echo_page(slice)->ep_lock)) + if (mutex_is_locked(&cl2echo_page(slice)->ep_lock)) return -EBUSY; return -ENODATA; } @@ -320,17 +285,14 @@ static void echo_page_completion(const struct lu_env *env, } static void echo_page_fini(const struct lu_env *env, - struct cl_page_slice *slice) + struct cl_page_slice *slice) { - struct echo_page *ep = cl2echo_page(slice); - struct echo_object *eco = cl2echo_obj(slice->cpl_obj); - cfs_page_t *vmpage = ep->ep_vmpage; - ENTRY; + struct echo_object *eco = cl2echo_obj(slice->cpl_obj); + ENTRY; - cfs_atomic_dec(&eco->eo_npages); - page_cache_release(vmpage); - OBD_SLAB_FREE_PTR(ep, echo_page_kmem); - EXIT; + atomic_dec(&eco->eo_npages); + page_cache_release(slice->cpl_page->cp_vmpage); + EXIT; } static int echo_page_prep(const struct lu_env *env, @@ -344,18 +306,18 @@ static int echo_page_print(const struct lu_env *env, const struct cl_page_slice *slice, void *cookie, lu_printer_t printer) { - struct echo_page *ep = cl2echo_page(slice); + struct echo_page *ep = cl2echo_page(slice); - (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n", - ep, cfs_mutex_is_locked(&ep->ep_lock), ep->ep_vmpage); - return 0; + (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n", + ep, mutex_is_locked(&ep->ep_lock), + slice->cpl_page->cp_vmpage); + return 0; } static const struct cl_page_operations echo_page_ops = { .cpo_own = echo_page_own, .cpo_disown = echo_page_disown, .cpo_discard = echo_page_discard, - .cpo_vmpage = echo_page_vmpage, .cpo_fini = echo_page_fini, .cpo_print = echo_page_print, .cpo_is_vmlocked = echo_page_is_vmlocked, @@ -383,7 +345,7 @@ static void echo_lock_fini(const struct lu_env *env, { struct echo_lock *ecl = cl2echo_lock(slice); - LASSERT(cfs_list_empty(&ecl->el_chain)); + LASSERT(list_empty(&ecl->el_chain)); OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem); } @@ -392,7 +354,7 @@ static void echo_lock_delete(const struct lu_env *env, { struct echo_lock *ecl = cl2echo_lock(slice); - LASSERT(cfs_list_empty(&ecl->el_chain)); + LASSERT(list_empty(&ecl->el_chain)); } static int echo_lock_fits_into(const struct lu_env *env, @@ -417,23 +379,18 @@ static struct cl_lock_operations echo_lock_ops = { * * @{ */ -static struct cl_page *echo_page_init(const struct lu_env *env, - struct cl_object *obj, - struct cl_page *page, cfs_page_t *vmpage) +static int echo_page_init(const struct lu_env *env, struct cl_object *obj, + struct cl_page *page, pgoff_t index) { - struct echo_page *ep; - ENTRY; + struct echo_page *ep = cl_object_page_slice(obj, page); + struct echo_object *eco = cl2echo_obj(obj); + ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(ep, echo_page_kmem, CFS_ALLOC_IO); - if (ep != NULL) { - struct echo_object *eco = cl2echo_obj(obj); - ep->ep_vmpage = vmpage; - page_cache_get(vmpage); - cfs_mutex_init(&ep->ep_lock); - cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops); - cfs_atomic_inc(&eco->eo_npages); - } - RETURN(ERR_PTR(ep ? 0 : -ENOMEM)); + page_cache_get(page->cp_vmpage); + mutex_init(&ep->ep_lock); + cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops); + atomic_inc(&eco->eo_npages); + RETURN(0); } static int echo_io_init(const struct lu_env *env, struct cl_object *obj, @@ -443,20 +400,20 @@ static int echo_io_init(const struct lu_env *env, struct cl_object *obj, } static int echo_lock_init(const struct lu_env *env, - struct cl_object *obj, struct cl_lock *lock, - const struct cl_io *unused) + struct cl_object *obj, struct cl_lock *lock, + const struct cl_io *unused) { - struct echo_lock *el; - ENTRY; + struct echo_lock *el; + ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, CFS_ALLOC_IO); - if (el != NULL) { - cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops); - el->el_object = cl2echo_obj(obj); - CFS_INIT_LIST_HEAD(&el->el_chain); - cfs_atomic_set(&el->el_refcount, 0); - } - RETURN(el == NULL ? -ENOMEM : 0); + OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS); + if (el != NULL) { + cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops); + el->el_object = cl2echo_obj(obj); + INIT_LIST_HEAD(&el->el_chain); + atomic_set(&el->el_refcount, 0); + } + RETURN(el == NULL ? -ENOMEM : 0); } static int echo_conf_set(const struct lu_env *env, struct cl_object *obj, @@ -512,35 +469,86 @@ static int echo_object_init(const struct lu_env *env, struct lu_object *obj, } eco->eo_dev = ed; - cfs_atomic_set(&eco->eo_npages, 0); + atomic_set(&eco->eo_npages, 0); + cl_object_page_init(lu2cl(obj), sizeof(struct echo_page)); - cfs_spin_lock(&ec->ec_lock); - cfs_list_add_tail(&eco->eo_obj_chain, &ec->ec_objects); - cfs_spin_unlock(&ec->ec_lock); + spin_lock(&ec->ec_lock); + list_add_tail(&eco->eo_obj_chain, &ec->ec_objects); + spin_unlock(&ec->ec_lock); - RETURN(0); + RETURN(0); +} + +/* taken from osc_unpackmd() */ +static int echo_alloc_memmd(struct echo_device *ed, + struct lov_stripe_md **lsmp) +{ + int lsm_size; + + ENTRY; + + /* If export is lov/osc then use their obd method */ + if (ed->ed_next != NULL) + return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp); + /* OFD has no unpackmd method, do everything here */ + lsm_size = lov_stripe_md_size(1); + + LASSERT(*lsmp == NULL); + OBD_ALLOC(*lsmp, lsm_size); + if (*lsmp == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + if ((*lsmp)->lsm_oinfo[0] == NULL) { + OBD_FREE(*lsmp, lsm_size); + RETURN(-ENOMEM); + } + + loi_init((*lsmp)->lsm_oinfo[0]); + (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + ostid_set_seq_echo(&(*lsmp)->lsm_oi); + + RETURN(lsm_size); +} + +static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp) +{ + int lsm_size; + + ENTRY; + + /* If export is lov/osc then use their obd method */ + if (ed->ed_next != NULL) + return obd_free_memmd(ed->ed_ec->ec_exp, lsmp); + /* OFD has no unpackmd method, do everything here */ + lsm_size = lov_stripe_md_size(1); + + LASSERT(*lsmp != NULL); + OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + OBD_FREE(*lsmp, lsm_size); + *lsmp = NULL; + RETURN(0); } static void echo_object_free(const struct lu_env *env, struct lu_object *obj) { struct echo_object *eco = cl2echo_obj(lu2cl(obj)); struct echo_client_obd *ec = eco->eo_dev->ed_ec; - struct lov_stripe_md *lsm = eco->eo_lsm; ENTRY; - LASSERT(cfs_atomic_read(&eco->eo_npages) == 0); + LASSERT(atomic_read(&eco->eo_npages) == 0); - cfs_spin_lock(&ec->ec_lock); - cfs_list_del_init(&eco->eo_obj_chain); - cfs_spin_unlock(&ec->ec_lock); + spin_lock(&ec->ec_lock); + list_del_init(&eco->eo_obj_chain); + spin_unlock(&ec->ec_lock); lu_object_fini(obj); lu_object_header_fini(obj->lo_header); - if (lsm) - obd_free_memmd(ec->ec_exp, &lsm); - OBD_SLAB_FREE_PTR(eco, echo_object_kmem); - EXIT; + if (eco->eo_lsm) + echo_free_memmd(eco->eo_dev, &eco->eo_lsm); + OBD_SLAB_FREE_PTR(eco, echo_object_kmem); + EXIT; } static int echo_object_print(const struct lu_env *env, void *cookie, @@ -568,28 +576,30 @@ static const struct lu_object_operations echo_lu_obj_ops = { * @{ */ static struct lu_object *echo_object_alloc(const struct lu_env *env, - const struct lu_object_header *hdr, - struct lu_device *dev) + const struct lu_object_header *hdr, + struct lu_device *dev) { - struct echo_object *eco; - struct lu_object *obj = NULL; - ENTRY; + struct echo_object *eco; + struct lu_object *obj = NULL; + ENTRY; - /* we're the top dev. */ - LASSERT(hdr == NULL); - OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, CFS_ALLOC_IO); - if (eco != NULL) { - struct cl_object_header *hdr = &eco->eo_hdr; + /* we're the top dev. */ + LASSERT(hdr == NULL); + OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS); + if (eco != NULL) { + struct cl_object_header *hdr = &eco->eo_hdr; - obj = &echo_obj2cl(eco)->co_lu; - cl_object_header_init(hdr); - lu_object_init(obj, &hdr->coh_lu, dev); - lu_object_add_top(&hdr->coh_lu, obj); + obj = &echo_obj2cl(eco)->co_lu; + cl_object_header_init(hdr); + hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page)); - eco->eo_cl.co_ops = &echo_cl_obj_ops; - obj->lo_ops = &echo_lu_obj_ops; - } - RETURN(obj); + lu_object_init(obj, &hdr->coh_lu, dev); + lu_object_add_top(&hdr->coh_lu, obj); + + eco->eo_cl.co_ops = &echo_cl_obj_ops; + obj->lo_ops = &echo_lu_obj_ops; + } + RETURN(obj); } static struct lu_device_operations echo_device_lu_ops = { @@ -637,14 +647,14 @@ static void echo_site_fini(const struct lu_env *env, struct echo_device *ed) } static void *echo_thread_key_init(const struct lu_context *ctx, - struct lu_context_key *key) + struct lu_context_key *key) { - struct echo_thread_info *info; + struct echo_thread_info *info; - OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, CFS_ALLOC_IO); - if (info == NULL) - info = ERR_PTR(-ENOMEM); - return info; + OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS); + if (info == NULL) + info = ERR_PTR(-ENOMEM); + return info; } static void echo_thread_key_fini(const struct lu_context *ctx, @@ -667,14 +677,14 @@ static struct lu_context_key echo_thread_key = { }; static void *echo_session_key_init(const struct lu_context *ctx, - struct lu_context_key *key) + struct lu_context_key *key) { - struct echo_session_info *session; + struct echo_session_info *session; - OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, CFS_ALLOC_IO); - if (session == NULL) - session = ERR_PTR(-ENOMEM); - return session; + OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS); + if (session == NULL) + session = ERR_PTR(-ENOMEM); + return session; } static void echo_session_key_fini(const struct lu_context *ctx, @@ -698,9 +708,10 @@ static struct lu_context_key echo_session_key = { LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key); -#define ECHO_SEQ_WIDTH 0xffffffff +#ifdef HAVE_SERVER_SUPPORT +# define ECHO_SEQ_WIDTH 0xffffffff static int echo_fid_init(struct echo_device *ed, char *obd_name, - struct md_site *ms) + struct seq_server_site *ss) { char *prefix; int rc; @@ -716,10 +727,10 @@ static int echo_fid_init(struct echo_device *ed, char *obd_name, snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name); - /* Init client side sequence-manager */ - rc = seq_client_init(ed->ed_cl_seq, NULL, - LUSTRE_SEQ_METADATA, - prefix, ms->ms_server_seq); + /* Init client side sequence-manager */ + rc = seq_client_init(ed->ed_cl_seq, NULL, + LUSTRE_SEQ_METADATA, + prefix, ss->ss_server_seq); ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH; OBD_FREE(prefix, MAX_OBD_NAME + 5); if (rc) @@ -746,6 +757,7 @@ static int echo_fid_fini(struct obd_device *obddev) RETURN(0); } +#endif /* HAVE_SERVER_SUPPORT */ static struct lu_device *echo_device_alloc(const struct lu_env *env, struct lu_device_type *t, @@ -805,6 +817,7 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env, cleanup = 4; if (ed->ed_next_ismd) { +#ifdef HAVE_SERVER_SUPPORT /* Suppose to connect to some Metadata layer */ struct lu_site *ls; struct lu_device *ld; @@ -826,14 +839,14 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env, ls = next->ld_site; - cfs_spin_lock(&ls->ls_ld_lock); - cfs_list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) { - if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) { - found = 1; - break; - } - } - cfs_spin_unlock(&ls->ls_ld_lock); + spin_lock(&ls->ls_ld_lock); + list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) { + if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) { + found = 1; + break; + } + } + spin_unlock(&ls->ls_ld_lock); if (found == 0) { CERROR("%s is not lu device type!\n", @@ -846,11 +859,17 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env, ed->ed_site_myself.cs_lu = *ls; ed->ed_site = &ed->ed_site_myself; ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu; - rc = echo_fid_init(ed, obd->obd_name, lu_site2md(ls)); - if (rc) { - CERROR("echo fid init error %d\n", rc); - GOTO(out, rc); - } + rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls)); + if (rc) { + CERROR("echo fid init error %d\n", rc); + GOTO(out, rc); + } +#else /* !HAVE_SERVER_SUPPORT */ + CERROR("Local operations are NOT supported on client side. " + "Only remote operations are supported. Metadata client " + "must be run on server side.\n"); + GOTO(out, rc = -EOPNOTSUPP); +#endif } else { /* if echo client is to be stacked upon ost device, the next is * NULL since ost is not a clio device so far */ @@ -961,38 +980,40 @@ static struct lu_device *echo_device_free(const struct lu_env *env, * all of cached objects. Anyway, probably the echo device is being * parallelly accessed. */ - cfs_spin_lock(&ec->ec_lock); - cfs_list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain) - eco->eo_deleted = 1; - cfs_spin_unlock(&ec->ec_lock); - - /* purge again */ - lu_site_purge(env, &ed->ed_site->cs_lu, -1); - - CDEBUG(D_INFO, - "Waiting for the reference of echo object to be dropped\n"); - - /* Wait for the last reference to be dropped. */ - cfs_spin_lock(&ec->ec_lock); - while (!cfs_list_empty(&ec->ec_objects)) { - cfs_spin_unlock(&ec->ec_lock); - CERROR("echo_client still has objects at cleanup time, " - "wait for 1 second\n"); - cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT, - cfs_time_seconds(1)); - lu_site_purge(env, &ed->ed_site->cs_lu, -1); - cfs_spin_lock(&ec->ec_lock); - } - cfs_spin_unlock(&ec->ec_lock); - - LASSERT(cfs_list_empty(&ec->ec_locks)); - - CDEBUG(D_INFO, "No object exists, exiting...\n"); - - echo_client_cleanup(d->ld_obd); - echo_fid_fini(d->ld_obd); - while (next && !ed->ed_next_ismd) - next = next->ld_type->ldt_ops->ldto_device_free(env, next); + spin_lock(&ec->ec_lock); + list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain) + eco->eo_deleted = 1; + spin_unlock(&ec->ec_lock); + + /* purge again */ + lu_site_purge(env, &ed->ed_site->cs_lu, -1); + + CDEBUG(D_INFO, + "Waiting for the reference of echo object to be dropped\n"); + + /* Wait for the last reference to be dropped. */ + spin_lock(&ec->ec_lock); + while (!list_empty(&ec->ec_objects)) { + spin_unlock(&ec->ec_lock); + CERROR("echo_client still has objects at cleanup time, " + "wait for 1 second\n"); + schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE, + cfs_time_seconds(1)); + lu_site_purge(env, &ed->ed_site->cs_lu, -1); + spin_lock(&ec->ec_lock); + } + spin_unlock(&ec->ec_lock); + + LASSERT(list_empty(&ec->ec_locks)); + + CDEBUG(D_INFO, "No object exists, exiting...\n"); + + echo_client_cleanup(d->ld_obd); +#ifdef HAVE_SERVER_SUPPORT + echo_fid_fini(d->ld_obd); +#endif + while (next && !ed->ed_next_ismd) + next = next->ld_type->ldt_ops->ldto_device_free(env, next); LASSERT(ed->ed_site == lu2cl_site(d->ld_site)); echo_site_fini(env, ed); @@ -1042,12 +1063,15 @@ static struct echo_object *cl_echo_object_find(struct echo_device *d, struct cl_object *obj; struct lu_fid *fid; int refcheck; - ENTRY; + int rc; + ENTRY; - LASSERT(lsmp); - lsm = *lsmp; - LASSERT(lsm); - LASSERT(lsm->lsm_object_id); + LASSERT(lsmp); + lsm = *lsmp; + LASSERT(lsm); + LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi)); + LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n", + POSTID(&lsm->lsm_oi)); /* Never return an object if the obd is to be freed. */ if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping) @@ -1063,8 +1087,7 @@ static struct echo_object *cl_echo_object_find(struct echo_device *d, if (!d->ed_next_islov) { struct lov_oinfo *oinfo = lsm->lsm_oinfo[0]; LASSERT(oinfo != NULL); - oinfo->loi_id = lsm->lsm_object_id; - oinfo->loi_seq = lsm->lsm_object_seq; + oinfo->loi_oi = lsm->lsm_oi; conf->eoc_cl.u.coc_oinfo = oinfo; } else { struct lustre_md *md; @@ -1076,9 +1099,14 @@ static struct echo_object *cl_echo_object_find(struct echo_device *d, } conf->eoc_md = lsmp; - fid = &info->eti_fid; - lsm2fid(lsm, fid); + fid = &info->eti_fid; + rc = ostid_to_fid(fid, &lsm->lsm_oi, 0); + if (rc != 0) + GOTO(out, eco = ERR_PTR(rc)); + /* In the function below, .hs_keycmp resolves to + * lu_obj_hop_keycmp() */ + /* coverity[overrun-buffer-val] */ obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl); if (IS_ERR(obj)) GOTO(out, eco = (void*)obj); @@ -1109,7 +1137,7 @@ static int cl_echo_object_put(struct echo_object *eco) if (eco->eo_deleted) { struct lu_object_header *loh = obj->co_lu.lo_header; LASSERT(&eco->eo_hdr == luh2coh(loh)); - cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags); + set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags); } cl_object_put(env, obj); @@ -1149,49 +1177,19 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco, rc = cl_wait(env, lck); if (rc == 0) { el = cl2echo_lock(cl_lock_at(lck, &echo_device_type)); - cfs_spin_lock(&ec->ec_lock); - if (cfs_list_empty(&el->el_chain)) { - cfs_list_add(&el->el_chain, &ec->ec_locks); - el->el_cookie = ++ec->ec_unique; - } - cfs_atomic_inc(&el->el_refcount); - *cookie = el->el_cookie; - cfs_spin_unlock(&ec->ec_lock); - } else - cl_lock_release(env, lck, "ec enqueue", cfs_current()); - } - RETURN(rc); -} - -static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end, - int mode, __u64 *cookie) -{ - struct echo_thread_info *info; - struct lu_env *env; - struct cl_io *io; - int refcheck; - int result; - ENTRY; - - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); - - info = echo_env_info(env); - io = &info->eti_io; - - result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco)); - if (result < 0) - GOTO(out, result); - LASSERT(result == 0); - - result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0); - cl_io_fini(env, io); - - EXIT; -out: - cl_env_put(env, &refcheck); - return result; + spin_lock(&ec->ec_lock); + if (list_empty(&el->el_chain)) { + list_add(&el->el_chain, &ec->ec_locks); + el->el_cookie = ++ec->ec_unique; + } + atomic_inc(&el->el_refcount); + *cookie = el->el_cookie; + spin_unlock(&ec->ec_lock); + } else { + cl_lock_release(env, lck, "ec enqueue", current); + } + } + RETURN(rc); } static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed, @@ -1199,25 +1197,25 @@ static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed, { struct echo_client_obd *ec = ed->ed_ec; struct echo_lock *ecl = NULL; - cfs_list_t *el; + struct list_head *el; int found = 0, still_used = 0; ENTRY; LASSERT(ec != NULL); - cfs_spin_lock (&ec->ec_lock); - cfs_list_for_each (el, &ec->ec_locks) { - ecl = cfs_list_entry (el, struct echo_lock, el_chain); + spin_lock(&ec->ec_lock); + list_for_each(el, &ec->ec_locks) { + ecl = list_entry(el, struct echo_lock, el_chain); CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie); found = (ecl->el_cookie == cookie); if (found) { - if (cfs_atomic_dec_and_test(&ecl->el_refcount)) - cfs_list_del_init(&ecl->el_chain); + if (atomic_dec_and_test(&ecl->el_refcount)) + list_del_init(&ecl->el_chain); else still_used = 1; break; } } - cfs_spin_unlock (&ec->ec_lock); + spin_unlock(&ec->ec_lock); if (!found) RETURN(-ENOENT); @@ -1226,43 +1224,21 @@ static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed, RETURN(0); } -static int cl_echo_cancel(struct echo_device *ed, __u64 cookie) +static void echo_commit_callback(const struct lu_env *env, struct cl_io *io, + struct cl_page *page) { - struct lu_env *env; - int refcheck; - int rc; - ENTRY; + struct echo_thread_info *info; + struct cl_2queue *queue; - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); + info = echo_env_info(env); + LASSERT(io == &info->eti_io); - rc = cl_echo_cancel0(env, ed, cookie); - - cl_env_put(env, &refcheck); - RETURN(rc); -} - -static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io, - enum cl_req_type unused, struct cl_2queue *queue) -{ - struct cl_page *clp; - struct cl_page *temp; - int result = 0; - ENTRY; - - cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) { - int rc; - rc = cl_page_cache_add(env, io, clp, CRT_WRITE); - if (rc == 0) - continue; - result = result ?: rc; - } - RETURN(result); + queue = &info->eti_queue; + cl_page_list_add(&queue->c2_qout, page); } static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, - cfs_page_t **pages, int npages, int async) + struct page **pages, int npages, int async) { struct lu_env *env; struct echo_thread_info *info; @@ -1289,6 +1265,8 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, queue = &info->eti_queue; cl_2queue_init(queue); + + io->ci_ignore_layout = 1; rc = cl_io_init(env, io, CIT_MISC, obj); if (rc < 0) GOTO(out, rc); @@ -1296,7 +1274,7 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, rc = cl_echo_enqueue0(env, eco, offset, - offset + npages * CFS_PAGE_SIZE - 1, + offset + npages * PAGE_CACHE_SIZE - 1, rw == READ ? LCK_PR : LCK_PW, &lh.cookie, CEF_NEVER); if (rc < 0) @@ -1334,10 +1312,11 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, async = async && (typ == CRT_WRITE); if (async) - rc = cl_echo_async_brw(env, io, typ, queue); - else - rc = cl_io_submit_sync(env, io, typ, queue, - CRP_NORMAL, 0); + rc = cl_io_commit_async(env, io, &queue->c2_qin, + 0, PAGE_SIZE, + echo_commit_callback); + else + rc = cl_io_submit_sync(env, io, typ, queue, 0); CDEBUG(D_INFO, "echo_client %s write returns %d\n", async ? "async" : "sync", rc); } @@ -1359,27 +1338,6 @@ out: static obd_id last_object_id; static int -echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob) -{ - struct lov_stripe_md *ulsm = _ulsm; - int nob, i; - - nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]); - if (nob > ulsm_nob) - return (-EINVAL); - - if (cfs_copy_to_user (ulsm, lsm, sizeof(ulsm))) - return (-EFAULT); - - for (i = 0; i < lsm->lsm_stripe_count; i++) { - if (cfs_copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i], - sizeof(lsm->lsm_oinfo[0]))) - return (-EFAULT); - } - return 0; -} - -static int echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob) { @@ -1389,7 +1347,7 @@ echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm, if (ulsm_nob < sizeof (*lsm)) return (-EINVAL); - if (cfs_copy_from_user (lsm, ulsm, sizeof (*lsm))) + if (copy_from_user (lsm, ulsm, sizeof (*lsm))) return (-EFAULT); if (lsm->lsm_stripe_count > ec->ec_nstripes || @@ -1400,7 +1358,7 @@ echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm, for (i = 0; i < lsm->lsm_stripe_count; i++) { - if (cfs_copy_from_user(lsm->lsm_oinfo[i], + if (copy_from_user(lsm->lsm_oinfo[i], ((struct lov_stripe_md *)ulsm)-> \ lsm_oinfo[i], sizeof(lsm->lsm_oinfo[0]))) @@ -1409,32 +1367,157 @@ echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm, return (0); } +#ifdef HAVE_SERVER_SUPPORT static inline void echo_md_build_name(struct lu_name *lname, char *name, - __u64 id) -{ - sprintf(name, "%llu", id); - lname->ln_name = name; - lname->ln_namelen = strlen(name); + __u64 id) +{ + sprintf(name, LPU64, id); + lname->ln_name = name; + lname->ln_namelen = strlen(name); +} + +/* similar to mdt_attr_get_complex */ +static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o, + struct md_attr *ma) +{ + struct echo_thread_info *info = echo_env_info(env); + int rc; + + ENTRY; + + LASSERT(ma->ma_lmm_size > 0); + + rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + /* big_lmm may need to be grown */ + if (info->eti_big_lmmsize < rc) { + int size = size_roundup_power2(rc); + + if (info->eti_big_lmmsize > 0) { + /* free old buffer */ + LASSERT(info->eti_big_lmm); + OBD_FREE_LARGE(info->eti_big_lmm, + info->eti_big_lmmsize); + info->eti_big_lmm = NULL; + info->eti_big_lmmsize = 0; + } + + OBD_ALLOC_LARGE(info->eti_big_lmm, size); + if (info->eti_big_lmm == NULL) + RETURN(-ENOMEM); + info->eti_big_lmmsize = size; + } + LASSERT(info->eti_big_lmmsize >= rc); + + info->eti_buf.lb_buf = info->eti_big_lmm; + info->eti_buf.lb_len = info->eti_big_lmmsize; + rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + ma->ma_valid |= MA_LOV; + ma->ma_lmm = info->eti_big_lmm; + ma->ma_lmm_size = rc; + + RETURN(0); +} + +static int echo_attr_get_complex(const struct lu_env *env, + struct md_object *next, + struct md_attr *ma) +{ + struct echo_thread_info *info = echo_env_info(env); + struct lu_buf *buf = &info->eti_buf; + umode_t mode = lu_object_attr(&next->mo_lu); + int need = ma->ma_need; + int rc = 0, rc2; + + ENTRY; + + ma->ma_valid = 0; + + if (need & MA_INODE) { + ma->ma_need = MA_INODE; + rc = mo_attr_get(env, next, ma); + if (rc) + GOTO(out, rc); + ma->ma_valid |= MA_INODE; + } + + if (need & MA_LOV) { + if (S_ISREG(mode) || S_ISDIR(mode)) { + LASSERT(ma->ma_lmm_size > 0); + buf->lb_buf = ma->ma_lmm; + buf->lb_len = ma->ma_lmm_size; + rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV); + if (rc2 > 0) { + ma->ma_lmm_size = rc2; + ma->ma_valid |= MA_LOV; + } else if (rc2 == -ENODATA) { + /* no LOV EA */ + ma->ma_lmm_size = 0; + } else if (rc2 == -ERANGE) { + rc2 = echo_big_lmm_get(env, next, ma); + if (rc2 < 0) + GOTO(out, rc = rc2); + } else { + GOTO(out, rc = rc2); + } + } + } + +#ifdef CONFIG_FS_POSIX_ACL + if (need & MA_ACL_DEF && S_ISDIR(mode)) { + buf->lb_buf = ma->ma_acl; + buf->lb_len = ma->ma_acl_size; + rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT); + if (rc2 > 0) { + ma->ma_acl_size = rc2; + ma->ma_valid |= MA_ACL_DEF; + } else if (rc2 == -ENODATA) { + /* no ACLs */ + ma->ma_acl_size = 0; + } else { + GOTO(out, rc = rc2); + } + } +#endif +out: + ma->ma_need = need; + CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n", + rc, ma->ma_valid, ma->ma_lmm); + RETURN(rc); } -static int echo_md_create_internal(const struct lu_env *env, - struct echo_device *ed, - struct md_object *parent, - struct lu_fid *fid, - struct lu_name *lname, - struct md_op_spec *spec, - struct md_attr *ma) -{ - struct lu_object *ec_child, *child; - struct lu_device *ld = ed->ed_next; - int rc; - - ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, - fid, NULL); +static int +echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, + struct md_object *parent, struct lu_fid *fid, + struct lu_name *lname, struct md_op_spec *spec, + struct md_attr *ma) +{ + struct lu_object *ec_child, *child; + struct lu_device *ld = ed->ed_next; + struct echo_thread_info *info = echo_env_info(env); + struct lu_fid *fid2 = &info->eti_fid2; + struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; + int rc; + + ENTRY; + + rc = mdo_lookup(env, parent, lname, fid2, spec); + if (rc == 0) + return -EEXIST; + else if (rc != -ENOENT) + return rc; + + ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, + fid, &conf); if (IS_ERR(ec_child)) { CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid), PTR_ERR(ec_child)); - return PTR_ERR(ec_child); + RETURN(PTR_ERR(ec_child)); } child = lu_object_locate(ec_child->lo_header, ld->ld_type); @@ -1446,6 +1529,10 @@ static int echo_md_create_internal(const struct lu_env *env, CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n", PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent); + /* + * Do not perform lookup sanity check. We know that name does not exist. + */ + spec->sp_cr_lookup = 0; rc = mdo_create(env, parent, lname, lu2md(child), spec, ma); if (rc) { CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc); @@ -1453,44 +1540,27 @@ static int echo_md_create_internal(const struct lu_env *env, } CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc = %d\n", PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc); + EXIT; out_put: lu_object_put(env, ec_child); return rc; } -static int echo_set_lmm_size(const struct lu_env *env, - struct lu_device *ld, - struct md_attr *ma) +static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld, + struct md_attr *ma) { - struct md_device *md = lu2md_dev(ld); - int lmm_size, cookie_size, rc; - ENTRY; - - md = lu2md_dev(ld); - rc = md->md_ops->mdo_maxsize_get(env, md, - &lmm_size, &cookie_size); - if (rc) - RETURN(rc); + struct echo_thread_info *info = echo_env_info(env); - ma->ma_lmm_size = lmm_size; - if (lmm_size > 0) { - OBD_ALLOC(ma->ma_lmm, lmm_size); - if (ma->ma_lmm == NULL) { - ma->ma_lmm_size = 0; - RETURN(-ENOMEM); - } - } - - ma->ma_cookie_size = cookie_size; - if (cookie_size > 0) { - OBD_ALLOC(ma->ma_cookie, cookie_size); - if (ma->ma_cookie == NULL) { - ma->ma_cookie_size = 0; - RETURN(-ENOMEM); - } - } + if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) { + ma->ma_lmm = (void *)&info->eti_lmm; + ma->ma_lmm_size = sizeof(info->eti_lmm); + } else { + LASSERT(info->eti_big_lmmsize); + ma->ma_lmm = info->eti_big_lmm; + ma->ma_lmm_size = info->eti_big_lmmsize; + } - RETURN(0); + return 0; } static int echo_create_md_object(const struct lu_env *env, @@ -1510,32 +1580,34 @@ static int echo_create_md_object(const struct lu_env *env, int rc = 0; int i; + ENTRY; + + if (ec_parent == NULL) + return -1; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - RETURN(PTR_ERR(parent)); - } + if (parent == NULL) + RETURN(-ENXIO); memset(ma, 0, sizeof(*ma)); memset(spec, 0, sizeof(*spec)); if (stripe_count != 0) { spec->sp_cr_flags |= FMODE_WRITE; - rc = echo_set_lmm_size(env, ld, ma); - if (rc) - GOTO(out_free, rc); + echo_set_lmm_size(env, ld, ma); if (stripe_count != -1) { struct lov_user_md_v3 *lum = &info->eti_lum; + lum->lmm_magic = LOV_USER_MAGIC_V3; lum->lmm_stripe_count = stripe_count; lum->lmm_stripe_offset = stripe_offset; lum->lmm_pattern = 0; spec->u.sp_ea.eadata = lum; + spec->u.sp_ea.eadatalen = sizeof(*lum); spec->sp_cr_flags |= MDS_OPEN_HAS_EA; } } ma->ma_attr.la_mode = mode; - ma->ma_attr.la_valid = LA_CTIME; + ma->ma_attr.la_valid = LA_CTIME | LA_MODE; ma->ma_attr.la_ctime = cfs_time_current_64(); if (name != NULL) { @@ -1544,7 +1616,7 @@ static int echo_create_md_object(const struct lu_env *env, /* If name is specified, only create one object by name */ rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname, spec, ma); - GOTO(out_free, rc); + RETURN(rc); } /* Create multiple object sequenced by id */ @@ -1564,13 +1636,7 @@ static int echo_create_md_object(const struct lu_env *env, fid->f_oid++; } -out_free: - if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, ma->ma_lmm_size); - if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, ma->ma_cookie_size); - - return rc; + RETURN(rc); } static struct lu_object *echo_md_lookup(const struct lu_env *env, @@ -1592,6 +1658,9 @@ static struct lu_object *echo_md_lookup(const struct lu_env *env, RETURN(ERR_PTR(rc)); } + /* In the function below, .hs_keycmp resolves to + * lu_obj_hop_keycmp() */ + /* coverity[overrun-buffer-val] */ child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL); RETURN(child); @@ -1611,14 +1680,14 @@ static int echo_setattr_object(const struct lu_env *env, int rc = 0; int i; + ENTRY; + + if (ec_parent == NULL) + return -1; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + RETURN(-ENXIO); - buf->lb_buf = info->eti_xattr_buf; - buf->lb_len = sizeof(info->eti_xattr_buf); for (i = 0; i < count; i++) { struct lu_object *ec_child, *child; @@ -1642,10 +1711,13 @@ static int echo_setattr_object(const struct lu_env *env, CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n", PFID(lu_object_fid(child))); + buf->lb_buf = info->eti_xattr_buf; + buf->lb_len = sizeof(info->eti_xattr_buf); + sprintf(name, "%s.test1", XATTR_USER_PREFIX); rc = mo_xattr_set(env, lu2md(child), buf, name, LU_XATTR_CREATE); - if (rc) { + if (rc < 0) { CERROR("Can not setattr child "DFID": rc = %d\n", PFID(lu_object_fid(child)), rc); lu_object_put(env, ec_child); @@ -1656,7 +1728,7 @@ static int echo_setattr_object(const struct lu_env *env, id++; lu_object_put(env, ec_child); } - return rc; + RETURN(rc); } static int echo_getattr_object(const struct lu_env *env, @@ -1673,17 +1745,15 @@ static int echo_getattr_object(const struct lu_env *env, int rc = 0; int i; + ENTRY; + + if (ec_parent == NULL) + return -1; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + RETURN(-ENXIO); memset(ma, 0, sizeof(*ma)); - rc = echo_set_lmm_size(env, ld, ma); - if (rc) - GOTO(out_free, rc); - ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF; ma->ma_acl = info->eti_xattr_buf; ma->ma_acl_size = sizeof(info->eti_xattr_buf); @@ -1693,6 +1763,7 @@ static int echo_getattr_object(const struct lu_env *env, ma->ma_valid = 0; echo_md_build_name(lname, name, id); + echo_set_lmm_size(env, ld, ma); ec_child = echo_md_lookup(env, ed, lu2md(parent), lname); if (IS_ERR(ec_child)) { @@ -1705,12 +1776,12 @@ static int echo_getattr_object(const struct lu_env *env, if (child == NULL) { CERROR("Can not locate the child %s\n", lname->ln_name); lu_object_put(env, ec_child); - GOTO(out_free, rc = -EINVAL); + RETURN(-EINVAL); } CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n", PFID(lu_object_fid(child))); - rc = mo_attr_get(env, lu2md(child), ma); + rc = echo_attr_get_complex(env, lu2md(child), ma); if (rc) { CERROR("Can not getattr child "DFID": rc = %d\n", PFID(lu_object_fid(child)), rc); @@ -1723,12 +1794,7 @@ static int echo_getattr_object(const struct lu_env *env, lu_object_put(env, ec_child); } -out_free: - if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, ma->ma_lmm_size); - if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, ma->ma_cookie_size); - return rc; + RETURN(rc); } static int echo_lookup_object(const struct lu_env *env, @@ -1745,11 +1811,11 @@ static int echo_lookup_object(const struct lu_env *env, int rc = 0; int i; + if (ec_parent == NULL) + return -1; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + return -ENXIO; /*prepare the requests*/ for (i = 0; i < count; i++) { @@ -1782,6 +1848,8 @@ static int echo_md_destroy_internal(const struct lu_env *env, struct lu_object *child; int rc; + ENTRY; + ec_child = echo_md_lookup(env, ed, parent, lname); if (IS_ERR(ec_child)) { CERROR("Can't find child %s: rc = %ld\n", lname->ln_name, @@ -1795,23 +1863,20 @@ static int echo_md_destroy_internal(const struct lu_env *env, GOTO(out_put, rc = -EINVAL); } + if (lu_object_remote(child)) { + CERROR("Can not destroy remote object %s: rc = %d\n", + lname->ln_name, -EPERM); + GOTO(out_put, rc = -EPERM); + } CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n", PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent); -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,3,50,0) - /* After 2.4, MDT will send destroy RPC to OST directly, so no need - * this flag */ - ma->ma_valid |= MA_FLAGS; - ma->ma_attr_flags |= MDS_UNLINK_DESTROY; -#else -#warning "Please remove this after 2.4 (LOD/OSP)" -#endif - rc = mdo_unlink(env, parent, lu2md(child), lname, ma); - if (rc) { - CERROR("Can not unlink child %s: rc = %d\n", - lname->ln_name, rc); - GOTO(out_put, rc); - } + rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0); + if (rc) { + CERROR("Can not unlink child %s: rc = %d\n", + lname->ln_name, rc); + GOTO(out_put, rc); + } CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n", PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent); out_put: @@ -1846,22 +1911,18 @@ static int echo_destroy_object(const struct lu_env *env, ma->ma_need = MA_INODE; ma->ma_valid = 0; - rc = echo_set_lmm_size(env, ld, ma); - if (rc) - GOTO(out_free, rc); if (name != NULL) { lname->ln_name = name; lname->ln_namelen = namelen; rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname, ma); - GOTO(out_free, rc); + RETURN(rc); } /*prepare the requests*/ for (i = 0; i < count; i++) { char *tmp_name = info->eti_name; - ma->ma_need |= MA_LOV; ma->ma_valid = 0; echo_md_build_name(lname, tmp_name, id); @@ -1874,12 +1935,7 @@ static int echo_destroy_object(const struct lu_env *env, id++; } -out_free: - if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, ma->ma_lmm_size); - if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, ma->ma_cookie_size); - RETURN(rc); + RETURN(rc); } static struct lu_object *echo_resolve_path(const struct lu_env *env, @@ -1903,6 +1959,9 @@ static struct lu_object *echo_resolve_path(const struct lu_env *env, RETURN(ERR_PTR(rc)); } + /* In the function below, .hs_keycmp resolves to + * lu_obj_hop_keycmp() */ + /* coverity[overrun-buffer-val] */ parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL); if (IS_ERR(parent)) { CERROR("Can not find the parent "DFID": rc = %ld\n", @@ -1951,13 +2010,44 @@ static struct lu_object *echo_resolve_path(const struct lu_env *env, RETURN(parent); } -#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_NOREF | LCT_MD_THREAD) -#define ECHO_MD_SES_TAG (LCT_SESSION | LCT_REMEMBER | LCT_NOREF) +static void echo_ucred_init(struct lu_env *env) +{ + struct lu_ucred *ucred = lu_ucred(env); + + ucred->uc_valid = UCRED_INVALID; + + ucred->uc_suppgids[0] = -1; + ucred->uc_suppgids[1] = -1; + + ucred->uc_uid = ucred->uc_o_uid = + from_kuid(&init_user_ns, current_uid()); + ucred->uc_gid = ucred->uc_o_gid = + from_kgid(&init_user_ns, current_gid()); + ucred->uc_fsuid = ucred->uc_o_fsuid = + from_kuid(&init_user_ns, current_fsuid()); + ucred->uc_fsgid = ucred->uc_o_fsgid = + from_kgid(&init_user_ns, current_fsgid()); + ucred->uc_cap = cfs_curproc_cap_pack(); + + /* remove fs privilege for non-root user. */ + if (ucred->uc_fsuid) + ucred->uc_cap &= ~CFS_CAP_FS_MASK; + ucred->uc_valid = UCRED_NEW; +} + +static void echo_ucred_fini(struct lu_env *env) +{ + struct lu_ucred *ucred = lu_ucred(env); + ucred->uc_valid = UCRED_INIT; +} +#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD) +#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION) static int echo_md_handler(struct echo_device *ed, int command, - char *path, int path_len, int id, int count, - struct obd_ioctl_data *data) + char *path, int path_len, __u64 id, int count, + struct obd_ioctl_data *data) { + struct echo_thread_info *info; struct lu_device *ld = ed->ed_next; struct lu_env *env; int refcheck; @@ -1982,29 +2072,34 @@ static int echo_md_handler(struct echo_device *ed, int command, RETURN(PTR_ERR(env)); rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG); - if (rc != 0) { - cl_env_put(env, &refcheck); - RETURN(rc); - } + if (rc != 0) + GOTO(out_env, rc); + + /* init big_lmm buffer */ + info = echo_env_info(env); + LASSERT(info->eti_big_lmm == NULL); + OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE); + if (info->eti_big_lmm == NULL) + GOTO(out_env, rc = -ENOMEM); + info->eti_big_lmmsize = MIN_MD_SIZE; parent = echo_resolve_path(env, ed, path, path_len); if (IS_ERR(parent)) { CERROR("Can not resolve the path %s: rc = %ld\n", path, PTR_ERR(parent)); - cl_env_put(env, &refcheck); - RETURN(PTR_ERR(parent)); + GOTO(out_free, rc = PTR_ERR(parent)); } if (namelen > 0) { OBD_ALLOC(name, namelen + 1); if (name == NULL) - RETURN(-ENOMEM); - if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen)) { - OBD_FREE(name, namelen + 1); - RETURN(-EFAULT); - } + GOTO(out_put, rc = -ENOMEM); + if (copy_from_user(name, data->ioc_pbuf2, namelen)) + GOTO(out_name, rc = -EFAULT); } + echo_ucred_init(env); + switch (command) { case ECHO_MD_CREATE: case ECHO_MD_MKDIR: { @@ -2014,9 +2109,13 @@ static int echo_md_handler(struct echo_device *ed, int command, int stripe_count = (int)data->ioc_obdo2.o_misc; int stripe_index = (int)data->ioc_obdo2.o_stripe_idx; - fid->f_seq = data->ioc_obdo1.o_seq; - fid->f_oid = (__u32)data->ioc_obdo1.o_id; - fid->f_ver = 0; + rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0); + if (rc != 0) + break; + + /* In the function below, .hs_keycmp resolves to + * lu_obj_hop_keycmp() */ + /* coverity[overrun-buffer-val] */ rc = echo_create_md_object(env, ed, parent, fid, name, namelen, id, mode, count, stripe_count, stripe_index); @@ -2044,12 +2143,23 @@ static int echo_md_handler(struct echo_device *ed, int command, rc = -EINVAL; break; } + echo_ucred_fini(env); + +out_name: if (name != NULL) OBD_FREE(name, namelen + 1); +out_put: lu_object_put(env, parent); +out_free: + LASSERT(info->eti_big_lmm); + OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize); + info->eti_big_lmm = NULL; + info->eti_big_lmmsize = 0; +out_env: cl_env_put(env, &refcheck); return rc; } +#endif /* HAVE_SERVER_SUPPORT */ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, int on_target, struct obdo *oa, void *ulsm, @@ -2069,7 +2179,7 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, RETURN(-EINVAL); } - rc = obd_alloc_memmd(ec->ec_exp, &lsm); + rc = echo_alloc_memmd(ed, &lsm); if (rc < 0) { CERROR("Cannot allocate md: rc = %d\n", rc); GOTO(failed, rc); @@ -2086,42 +2196,44 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, lsm->lsm_stripe_count = ec->ec_nstripes; if (lsm->lsm_stripe_size == 0) - lsm->lsm_stripe_size = CFS_PAGE_SIZE; + lsm->lsm_stripe_size = PAGE_CACHE_SIZE; idx = cfs_rand(); - /* setup stripes: indices + default ids if required */ - for (i = 0; i < lsm->lsm_stripe_count; i++) { - if (lsm->lsm_oinfo[i]->loi_id == 0) - lsm->lsm_oinfo[i]->loi_id = lsm->lsm_object_id; + /* setup stripes: indices + default ids if required */ + for (i = 0; i < lsm->lsm_stripe_count; i++) { + if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0) + lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi; - lsm->lsm_oinfo[i]->loi_ost_idx = - (idx + i) % ec->ec_nstripes; - } + lsm->lsm_oinfo[i]->loi_ost_idx = + (idx + i) % ec->ec_nstripes; + } } - /* setup object ID here for !on_target and LOV hint */ - if (oa->o_valid & OBD_MD_FLID) - lsm->lsm_object_id = oa->o_id; + /* setup object ID here for !on_target and LOV hint */ + if (oa->o_valid & OBD_MD_FLID) { + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + lsm->lsm_oi = oa->o_oi; + } - if (lsm->lsm_object_id == 0) - lsm->lsm_object_id = ++last_object_id; + if (ostid_id(&lsm->lsm_oi) == 0) + ostid_set_id(&lsm->lsm_oi, ++last_object_id); rc = 0; - if (on_target) { - /* Only echo objects are allowed to be created */ - LASSERT((oa->o_valid & OBD_MD_FLGROUP) && - (oa->o_seq == FID_SEQ_ECHO)); - rc = obd_create(env, ec->ec_exp, oa, &lsm, oti); - if (rc != 0) { - CERROR("Cannot create objects: rc = %d\n", rc); - GOTO(failed, rc); - } - created = 1; - } + if (on_target) { + /* Only echo objects are allowed to be created */ + LASSERT((oa->o_valid & OBD_MD_FLGROUP) && + (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO)); + rc = obd_create(env, ec->ec_exp, oa, &lsm, oti); + if (rc != 0) { + CERROR("Cannot create objects: rc = %d\n", rc); + GOTO(failed, rc); + } + created = 1; + } /* See what object ID we were given */ - oa->o_id = lsm->lsm_object_id; + oa->o_oi = lsm->lsm_oi; oa->o_valid |= OBD_MD_FLID; eco = cl_echo_object_find(ed, &lsm); @@ -2129,14 +2241,14 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, GOTO(failed, rc = PTR_ERR(eco)); cl_echo_object_put(eco); - CDEBUG(D_INFO, "oa->o_id = %lx\n", (long)oa->o_id); + CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi)); EXIT; failed: if (created && rc) obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL); if (lsm) - obd_free_memmd(ec->ec_exp, &lsm); + echo_free_memmd(ed, &lsm); if (rc) CERROR("create object failed with: rc = %d\n", rc); return (rc); @@ -2145,28 +2257,24 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, struct obdo *oa) { - struct echo_client_obd *ec = ed->ed_ec; struct lov_stripe_md *lsm = NULL; struct echo_object *eco; int rc; ENTRY; - if ((oa->o_valid & OBD_MD_FLID) == 0 || - oa->o_id == 0) /* disallow use of object id 0 */ - { + if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) { + /* disallow use of object id 0 */ CERROR ("No valid oid\n"); RETURN(-EINVAL); } - rc = obd_alloc_memmd(ec->ec_exp, &lsm); + rc = echo_alloc_memmd(ed, &lsm); if (rc < 0) RETURN(rc); - lsm->lsm_object_id = oa->o_id; - if (oa->o_valid & OBD_MD_FLGROUP) - lsm->lsm_object_seq = oa->o_seq; - else - lsm->lsm_object_seq = FID_SEQ_ECHO; + lsm->lsm_oi = oa->o_oi; + if (!(oa->o_valid & OBD_MD_FLGROUP)) + ostid_set_seq_echo(&lsm->lsm_oi); rc = 0; eco = cl_echo_object_find(ed, &lsm); @@ -2175,14 +2283,18 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, else rc = PTR_ERR(eco); if (lsm) - obd_free_memmd(ec->ec_exp, &lsm); + echo_free_memmd(ed, &lsm); RETURN(rc); } static void echo_put_object(struct echo_object *eco) { - if (cl_echo_object_put(eco)) - CERROR("echo client: drop an object failed"); + int rc; + + rc = cl_echo_object_put(eco); + if (rc) + CERROR("%s: echo client drop an object failed: rc = %d\n", + eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc); } static void @@ -2210,13 +2322,13 @@ echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp) stripe_index = woffset / stripe_size; - *idp = lsm->lsm_oinfo[stripe_index]->loi_id; - *offp = offset * stripe_size + woffset % stripe_size; + *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi); + *offp = offset * stripe_size + woffset % stripe_size; } static void echo_client_page_debug_setup(struct lov_stripe_md *lsm, - cfs_page_t *page, int rw, obd_id id, + struct page *page, int rw, obd_id id, obd_off offset, obd_off count) { char *addr; @@ -2225,11 +2337,11 @@ echo_client_page_debug_setup(struct lov_stripe_md *lsm, int delta; /* no partial pages on the client */ - LASSERT(count == CFS_PAGE_SIZE); + LASSERT(count == PAGE_CACHE_SIZE); - addr = cfs_kmap(page); + addr = kmap(page); - for (delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { + for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { if (rw == OBD_BRW_WRITE) { stripe_off = offset + delta; stripe_id = id; @@ -2242,11 +2354,11 @@ echo_client_page_debug_setup(struct lov_stripe_md *lsm, stripe_off, stripe_id); } - cfs_kunmap(page); + kunmap(page); } static int echo_client_page_debug_check(struct lov_stripe_md *lsm, - cfs_page_t *page, obd_id id, + struct page *page, obd_id id, obd_off offset, obd_off count) { obd_off stripe_off; @@ -2257,11 +2369,11 @@ static int echo_client_page_debug_check(struct lov_stripe_md *lsm, int rc2; /* no partial pages on the client */ - LASSERT(count == CFS_PAGE_SIZE); + LASSERT(count == PAGE_CACHE_SIZE); - addr = cfs_kmap(page); + addr = kmap(page); - for (rc = delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { + for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { stripe_off = offset + delta; stripe_id = id; echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id); @@ -2275,7 +2387,7 @@ static int echo_client_page_debug_check(struct lov_stripe_md *lsm, } } - cfs_kunmap(page); + kunmap(page); return rc; } @@ -2288,7 +2400,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, obd_count npages; struct brw_page *pga; struct brw_page *pgp; - cfs_page_t **pages; + struct page **pages; obd_off off; int i; int rc; @@ -2297,22 +2409,22 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, int brw_flags = 0; ENTRY; - verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID && + verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID && (oa->o_valid & OBD_MD_FLFLAGS) != 0 && (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0); - gfp_mask = ((oa->o_id & 2) == 0) ? CFS_ALLOC_STD : CFS_ALLOC_HIGHUSER; + gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER; - LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ); - LASSERT(lsm != NULL); - LASSERT(lsm->lsm_object_id == oa->o_id); + LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ); + LASSERT(lsm != NULL); + LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi)); if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0) RETURN(-EINVAL); /* XXX think again with misaligned I/O */ - npages = count >> CFS_PAGE_SHIFT; + npages = count >> PAGE_CACHE_SHIFT; if (rw == OBD_BRW_WRITE) brw_flags = OBD_BRW_ASYNC; @@ -2329,7 +2441,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, for (i = 0, pgp = pga, off = offset; i < npages; - i++, pgp++, off += CFS_PAGE_SIZE) { + i++, pgp++, off += PAGE_CACHE_SIZE) { LASSERT (pgp->pg == NULL); /* for cleanup */ @@ -2339,13 +2451,14 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, goto out; pages[i] = pgp->pg; - pgp->count = CFS_PAGE_SIZE; + pgp->count = PAGE_CACHE_SIZE; pgp->off = off; pgp->flag = brw_flags; - if (verify) - echo_client_page_debug_setup(lsm, pgp->pg, rw, - oa->o_id, off, pgp->count); + if (verify) + echo_client_page_debug_setup(lsm, pgp->pg, rw, + ostid_id(&oa->o_oi), off, + pgp->count); } /* brw mode can only be used at client */ @@ -2360,25 +2473,27 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, if (pgp->pg == NULL) continue; - if (verify) { - int vrc; - vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id, - pgp->off, pgp->count); - if (vrc != 0 && rc == 0) - rc = vrc; - } - OBD_PAGE_FREE(pgp->pg); + if (verify) { + int vrc; + vrc = echo_client_page_debug_check(lsm, pgp->pg, + ostid_id(&oa->o_oi), + pgp->off, pgp->count); + if (vrc != 0 && rc == 0) + rc = vrc; + } + OBD_PAGE_FREE(pgp->pg); } OBD_FREE(pga, npages * sizeof(*pga)); OBD_FREE(pages, npages * sizeof(*pages)); RETURN(rc); } -static int echo_client_prep_commit(struct obd_export *exp, int rw, - struct obdo *oa, struct echo_object *eco, - obd_off offset, obd_size count, - obd_size batch, struct obd_trans_info *oti, - int async) +static int echo_client_prep_commit(const struct lu_env *env, + struct obd_export *exp, int rw, + struct obdo *oa, struct echo_object *eco, + obd_off offset, obd_size count, + obd_size batch, struct obd_trans_info *oti, + int async) { struct lov_stripe_md *lsm = eco->eo_lsm; struct obd_ioobj ioo; @@ -2386,15 +2501,16 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, struct niobuf_remote *rnb; obd_off off; obd_size npages, tot_pages; - int i, ret = 0; + int i, ret = 0, brw_flags = 0; + ENTRY; - if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 || - (lsm != NULL && lsm->lsm_object_id != oa->o_id)) - RETURN(-EINVAL); + if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 || + (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi))) + RETURN(-EINVAL); - npages = batch >> CFS_PAGE_SHIFT; - tot_pages = count >> CFS_PAGE_SHIFT; + npages = batch >> PAGE_CACHE_SHIFT; + tot_pages = count >> PAGE_CACHE_SHIFT; OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local)); OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote)); @@ -2402,6 +2518,9 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, if (lnb == NULL || rnb == NULL) GOTO(out, ret = -ENOMEM); + if (rw == OBD_BRW_WRITE && async) + brw_flags |= OBD_BRW_ASYNC; + obdo_to_ioobj(oa, &ioo); off = offset; @@ -2412,55 +2531,59 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, if (tot_pages < npages) npages = tot_pages; - for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) { + for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) { rnb[i].offset = off; - rnb[i].len = CFS_PAGE_SIZE; + rnb[i].len = PAGE_CACHE_SIZE; + rnb[i].flags = brw_flags; } ioo.ioo_bufcnt = npages; - oti->oti_transno = 0; lpages = npages; - ret = obd_preprw(NULL, rw, exp, oa, 1, &ioo, rnb, &lpages, + ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti, NULL); if (ret != 0) GOTO(out, ret); LASSERT(lpages == npages); for (i = 0; i < lpages; i++) { - cfs_page_t *page = lnb[i].page; - - /* read past eof? */ - if (page == NULL && lnb[i].rc == 0) - continue; - - if (async) - lnb[i].flags |= OBD_BRW_ASYNC; - - if (oa->o_id == ECHO_PERSISTENT_OBJID || - (oa->o_valid & OBD_MD_FLFLAGS) == 0 || - (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0) - continue; - - if (rw == OBD_BRW_WRITE) - echo_client_page_debug_setup(lsm, page, rw, - oa->o_id, - rnb[i].offset, - rnb[i].len); - else - echo_client_page_debug_check(lsm, page, - oa->o_id, - rnb[i].offset, - rnb[i].len); - } - - ret = obd_commitrw(NULL, rw, exp, oa, 1, &ioo, - rnb, npages, lnb, oti, ret); + struct page *page = lnb[i].lnb_page; + + /* read past eof? */ + if (page == NULL && lnb[i].lnb_rc == 0) + continue; + + if (async) + lnb[i].lnb_flags |= OBD_BRW_ASYNC; + + if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID || + (oa->o_valid & OBD_MD_FLFLAGS) == 0 || + (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0) + continue; + + if (rw == OBD_BRW_WRITE) + echo_client_page_debug_setup(lsm, page, rw, + ostid_id(&oa->o_oi), + rnb[i].offset, + rnb[i].len); + else + echo_client_page_debug_check(lsm, page, + ostid_id(&oa->o_oi), + rnb[i].offset, + rnb[i].len); + } + + ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, + rnb, npages, lnb, oti, ret); if (ret != 0) GOTO(out, ret); /* Reset oti otherwise it would confuse ldiskfs. */ memset(oti, 0, sizeof(*oti)); + + /* Reuse env context. */ + lu_context_exit((struct lu_context *)&env->le_ctx); + lu_context_enter((struct lu_context *)&env->le_ctx); } out: @@ -2471,17 +2594,18 @@ out: RETURN(ret); } -static int echo_client_brw_ioctl(int rw, struct obd_export *exp, - struct obd_ioctl_data *data) +static int echo_client_brw_ioctl(const struct lu_env *env, int rw, + struct obd_export *exp, + struct obd_ioctl_data *data, + struct obd_trans_info *dummy_oti) { struct obd_device *obd = class_exp2obd(exp); struct echo_device *ed = obd2echo_dev(obd); struct echo_client_obd *ec = ed->ed_ec; - struct obd_trans_info dummy_oti = { 0 }; struct obdo *oa = &data->ioc_obdo1; struct echo_object *eco; int rc; - int async = 1; + int async = 0; long test_mode; ENTRY; @@ -2493,16 +2617,16 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, oa->o_valid &= ~OBD_MD_FLHANDLE; - /* obdfilter doesn't support obd_brw now, simulate via prep + commit */ + /* OFD/obdfilter works only via prep/commit */ test_mode = (long)data->ioc_pbuf1; - if (test_mode == 1) - async = 0; - if (ed->ed_next == NULL && test_mode != 3) { test_mode = 3; data->ioc_plen1 = data->ioc_count; } + if (test_mode == 3) + async = 1; + /* Truncate batch size to maximum */ if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE) data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE; @@ -2513,13 +2637,13 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, case 2: rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset, - data->ioc_count, async, &dummy_oti); + data->ioc_count, async, dummy_oti); break; case 3: - rc = echo_client_prep_commit(ec->ec_exp, rw, oa, - eco, data->ioc_offset, - data->ioc_count, data->ioc_plen1, - &dummy_oti, async); + rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, + eco, data->ioc_offset, + data->ioc_count, data->ioc_plen1, + dummy_oti, async); break; default: rc = -EINVAL; @@ -2529,57 +2653,12 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, } static int -echo_client_enqueue(struct obd_export *exp, struct obdo *oa, - int mode, obd_off offset, obd_size nob) -{ - struct echo_device *ed = obd2echo_dev(exp->exp_obd); - struct lustre_handle *ulh = &oa->o_handle; - struct echo_object *eco; - obd_off end; - int rc; - ENTRY; - - if (ed->ed_next == NULL) - RETURN(-EOPNOTSUPP); - - if (!(mode == LCK_PR || mode == LCK_PW)) - RETURN(-EINVAL); - - if ((offset & (~CFS_PAGE_MASK)) != 0 || - (nob & (~CFS_PAGE_MASK)) != 0) - RETURN(-EINVAL); - - rc = echo_get_object (&eco, ed, oa); - if (rc != 0) - RETURN(rc); - - end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1); - rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie); - if (rc == 0) { - oa->o_valid |= OBD_MD_FLHANDLE; - CDEBUG(D_INFO, "Cookie is "LPX64"\n", ulh->cookie); - } - echo_put_object(eco); - RETURN(rc); -} - -static int -echo_client_cancel(struct obd_export *exp, struct obdo *oa) -{ - struct echo_device *ed = obd2echo_dev(exp->exp_obd); - __u64 cookie = oa->o_handle.cookie; - - if ((oa->o_valid & OBD_MD_FLHANDLE) == 0) - return -EINVAL; - - CDEBUG(D_INFO, "Cookie is "LPX64"\n", cookie); - return cl_echo_cancel(ed, cookie); -} - -static int echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { +#ifdef HAVE_SERVER_SUPPORT + struct tgt_session_info *tsi; +#endif struct obd_device *obd = exp->exp_obd; struct echo_device *ed = obd2echo_dev(obd); struct echo_client_obd *ec = ed->ed_ec; @@ -2593,18 +2672,21 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, int rw = OBD_BRW_READ; int rc = 0; int i; +#ifdef HAVE_SERVER_SUPPORT + struct lu_context echo_session; +#endif ENTRY; memset(&dummy_oti, 0, sizeof(dummy_oti)); - oa = &data->ioc_obdo1; - if (!(oa->o_valid & OBD_MD_FLGROUP)) { - oa->o_valid |= OBD_MD_FLGROUP; - oa->o_seq = FID_SEQ_ECHO; - } + oa = &data->ioc_obdo1; + if (!(oa->o_valid & OBD_MD_FLGROUP)) { + oa->o_valid |= OBD_MD_FLGROUP; + ostid_set_seq_echo(&oa->o_oi); + } /* This FID is unpacked just for validation at this point */ - rc = fid_ostid_unpack(&fid, &oa->o_oi, 0); + rc = ostid_to_fid(&fid, &oa->o_oi, 0); if (rc < 0) RETURN(rc); @@ -2612,10 +2694,21 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, if (env == NULL) RETURN(-ENOMEM); - rc = lu_env_init(env, LCT_DT_THREAD); - if (rc) - GOTO(out, rc = -ENOMEM); + rc = lu_env_init(env, LCT_DT_THREAD); + if (rc) + GOTO(out_alloc, rc = -ENOMEM); +#ifdef HAVE_SERVER_SUPPORT + env->le_ses = &echo_session; + rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF); + if (unlikely(rc < 0)) + GOTO(out_env, rc); + lu_context_enter(env->le_ses); + + tsi = tgt_ses_info(env); + tsi->tsi_exp = ec->ec_exp; + tsi->tsi_jobid = NULL; +#endif switch (cmd) { case OBD_IOC_CREATE: /* may create echo object */ if (!cfs_capable(CFS_CAP_SYS_ADMIN)) @@ -2625,35 +2718,35 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, data->ioc_plen1, &dummy_oti); GOTO(out, rc); - case OBD_IOC_ECHO_MD: { - int count; - int cmd; - char *dir = NULL; - int dirlen; - __u64 id; - - if (!cfs_capable(CFS_CAP_SYS_ADMIN)) - GOTO(out, rc = -EPERM); - - count = data->ioc_count; - cmd = data->ioc_command; - - id = data->ioc_obdo2.o_id; - - dirlen = data->ioc_plen1; - OBD_ALLOC(dir, dirlen + 1); - if (dir == NULL) - GOTO(out, rc = -ENOMEM); - - if (cfs_copy_from_user(dir, data->ioc_pbuf1, dirlen)) { - OBD_FREE(dir, data->ioc_plen1 + 1); - GOTO(out, rc = -EFAULT); - } - - rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data); - OBD_FREE(dir, dirlen + 1); - GOTO(out, rc); - } +#ifdef HAVE_SERVER_SUPPORT + case OBD_IOC_ECHO_MD: { + int count; + int cmd; + char *dir = NULL; + int dirlen; + __u64 id; + + if (!cfs_capable(CFS_CAP_SYS_ADMIN)) + GOTO(out, rc = -EPERM); + + count = data->ioc_count; + cmd = data->ioc_command; + + id = data->ioc_obdo2.o_oi.oi.oi_id; + dirlen = data->ioc_plen1; + OBD_ALLOC(dir, dirlen + 1); + if (dir == NULL) + GOTO(out, rc = -ENOMEM); + + if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) { + OBD_FREE(dir, data->ioc_plen1 + 1); + GOTO(out, rc = -EFAULT); + } + + rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data); + OBD_FREE(dir, dirlen + 1); + GOTO(out, rc); + } case OBD_IOC_ECHO_ALLOC_SEQ: { struct lu_env *cl_env; int refcheck; @@ -2682,15 +2775,16 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, GOTO(out, rc); } - if (cfs_copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1)) + if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1)) return -EFAULT; - max_count = LUSTRE_SEQ_MAX_WIDTH; - if (cfs_copy_to_user(data->ioc_pbuf2, &max_count, - data->ioc_plen2)) - return -EFAULT; - GOTO(out, rc); + max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH; + if (copy_to_user(data->ioc_pbuf2, &max_count, + data->ioc_plen2)) + return -EFAULT; + GOTO(out, rc); } +#endif /* HAVE_SERVER_SUPPORT */ case OBD_IOC_DESTROY: if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); @@ -2738,49 +2832,9 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rw = OBD_BRW_WRITE; /* fall through */ case OBD_IOC_BRW_READ: - rc = echo_client_brw_ioctl(rw, exp, data); + rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti); GOTO(out, rc); - case ECHO_IOC_GET_STRIPE: - rc = echo_get_object(&eco, ed, oa); - if (rc == 0) { - rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1, - data->ioc_plen1); - echo_put_object(eco); - } - GOTO(out, rc); - - case ECHO_IOC_SET_STRIPE: - if (!cfs_capable(CFS_CAP_SYS_ADMIN)) - GOTO (out, rc = -EPERM); - - if (data->ioc_pbuf1 == NULL) { /* unset */ - rc = echo_get_object(&eco, ed, oa); - if (rc == 0) { - eco->eo_deleted = 1; - echo_put_object(eco); - } - } else { - rc = echo_create_object(env, ed, 0, oa, - data->ioc_pbuf1, - data->ioc_plen1, &dummy_oti); - } - GOTO (out, rc); - - case ECHO_IOC_ENQUEUE: - if (!cfs_capable(CFS_CAP_SYS_ADMIN)) - GOTO (out, rc = -EPERM); - - rc = echo_client_enqueue(exp, oa, - data->ioc_conn1, /* lock mode */ - data->ioc_offset, - data->ioc_count);/*extent*/ - GOTO (out, rc); - - case ECHO_IOC_CANCEL: - rc = echo_client_cancel(exp, oa); - GOTO (out, rc); - default: CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd); GOTO (out, rc = -ENOTTY); @@ -2788,7 +2842,13 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, EXIT; out: +#ifdef HAVE_SERVER_SUPPORT + lu_context_exit(env->le_ses); + lu_context_fini(env->le_ses); +out_env: +#endif lu_env_fini(env); +out_alloc: OBD_FREE_PTR(env); /* XXX this should be in a helper also called by target_send_reply */ @@ -2824,17 +2884,23 @@ static int echo_client_setup(const struct lu_env *env, RETURN(-EINVAL); } - cfs_spin_lock_init (&ec->ec_lock); - CFS_INIT_LIST_HEAD (&ec->ec_objects); - CFS_INIT_LIST_HEAD (&ec->ec_locks); + spin_lock_init(&ec->ec_lock); + INIT_LIST_HEAD(&ec->ec_objects); + INIT_LIST_HEAD(&ec->ec_locks); ec->ec_unique = 0; ec->ec_nstripes = 0; - if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) { - lu_context_tags_update(ECHO_MD_CTX_TAG); - lu_session_tags_update(ECHO_MD_SES_TAG); - RETURN(0); - } + if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) { +#ifdef HAVE_SERVER_SUPPORT + lu_context_tags_update(ECHO_MD_CTX_TAG); + lu_session_tags_update(ECHO_MD_SES_TAG); +#else + CERROR("Local operations are NOT supported on client side. " + "Only remote operations are supported. Metadata client " + "must be run on server side.\n"); +#endif + RETURN(0); + } OBD_ALLOC(ocd, sizeof(*ocd)); if (ocd == NULL) { @@ -2844,17 +2910,20 @@ static int echo_client_setup(const struct lu_env *env, } ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | + OBD_CONNECT_BRW_SIZE | OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 | - OBD_CONNECT_64BITHASH; + OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE | + OBD_CONNECT_FID; + ocd->ocd_brw_size = DT_MAX_BRW_SIZE; ocd->ocd_version = LUSTRE_VERSION_CODE; ocd->ocd_group = FID_SEQ_ECHO; rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL); if (rc == 0) { /* Turn off pinger because it connects to tgt obd directly. */ - cfs_spin_lock(&tgt->obd_dev_lock); - cfs_list_del_init(&ec->ec_exp->exp_obd_chain_timed); - cfs_spin_unlock(&tgt->obd_dev_lock); + spin_lock(&tgt->obd_dev_lock); + list_del_init(&ec->ec_exp->exp_obd_chain_timed); + spin_unlock(&tgt->obd_dev_lock); } OBD_FREE(ocd, sizeof(*ocd)); @@ -2880,17 +2949,22 @@ static int echo_client_cleanup(struct obd_device *obddev) RETURN(0); if (ed->ed_next_ismd) { - lu_context_tags_clear(ECHO_MD_CTX_TAG); - lu_session_tags_clear(ECHO_MD_SES_TAG); +#ifdef HAVE_SERVER_SUPPORT + lu_context_tags_clear(ECHO_MD_CTX_TAG); + lu_session_tags_clear(ECHO_MD_SES_TAG); +#else + CERROR("This is client-side only module, does not support " + "metadata echo client.\n"); +#endif RETURN(0); } - if (!cfs_list_empty(&obddev->obd_exports)) { + if (!list_empty(&obddev->obd_exports)) { CERROR("still has clients!\n"); RETURN(-EBUSY); } - LASSERT(cfs_atomic_read(&ec->ec_exp->exp_refcount) > 0); + LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0); rc = obd_disconnect(ec->ec_exp); if (rc != 0) CERROR("fail to disconnect device: %d\n", rc); @@ -2917,38 +2991,12 @@ static int echo_client_connect(const struct lu_env *env, static int echo_client_disconnect(struct obd_export *exp) { -#if 0 - struct obd_device *obd; - struct echo_client_obd *ec; - struct ec_lock *ecl; -#endif int rc; ENTRY; if (exp == NULL) GOTO(out, rc = -EINVAL); -#if 0 - obd = exp->exp_obd; - ec = &obd->u.echo_client; - - /* no more contention on export's lock list */ - while (!cfs_list_empty (&exp->exp_ec_data.eced_locks)) { - ecl = cfs_list_entry (exp->exp_ec_data.eced_locks.next, - struct ec_lock, ecl_exp_chain); - cfs_list_del (&ecl->ecl_exp_chain); - - rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm, - ecl->ecl_mode, &ecl->ecl_lock_handle); - - CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect " - "(%d)\n", ecl->ecl_object->eco_id, rc); - - echo_put_object (ecl->ecl_object); - OBD_FREE (ecl, sizeof (*ecl)); - } -#endif - rc = class_disconnect(exp); GOTO(out, rc); out: @@ -2957,12 +3005,6 @@ static int echo_client_disconnect(struct obd_export *exp) static struct obd_ops echo_client_obd_ops = { .o_owner = THIS_MODULE, - -#if 0 - .o_setup = echo_client_setup, - .o_cleanup = echo_client_cleanup, -#endif - .o_iocontrol = echo_client_iocontrol, .o_connect = echo_client_connect, .o_disconnect = echo_client_disconnect @@ -2970,21 +3012,20 @@ static struct obd_ops echo_client_obd_ops = { int echo_client_init(void) { - struct lprocfs_static_vars lvars = { 0 }; int rc; - lprocfs_echo_init_vars(&lvars); - - rc = lu_kmem_init(echo_caches); - if (rc == 0) { - rc = class_register_type(&echo_client_obd_ops, NULL, - lvars.module_vars, - LUSTRE_ECHO_CLIENT_NAME, - &echo_device_type); - if (rc) - lu_kmem_fini(echo_caches); - } - return rc; + rc = lu_kmem_init(echo_caches); + if (rc == 0) { + rc = class_register_type(&echo_client_obd_ops, NULL, true, NULL, +#ifndef HAVE_ONLY_PROCFS_SEQ + NULL, +#endif + LUSTRE_ECHO_CLIENT_NAME, + &echo_device_type); + if (rc) + lu_kmem_fini(echo_caches); + } + return rc; } void echo_client_exit(void) @@ -2996,25 +3037,25 @@ void echo_client_exit(void) #ifdef __KERNEL__ static int __init obdecho_init(void) { - struct lprocfs_static_vars lvars; int rc; ENTRY; LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n"); - LASSERT(CFS_PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0); - - lprocfs_echo_init_vars(&lvars); + LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0); # ifdef HAVE_SERVER_SUPPORT rc = echo_persistent_pages_init(); if (rc != 0) goto failed_0; - rc = class_register_type(&echo_obd_ops, NULL, lvars.module_vars, - LUSTRE_ECHO_NAME, NULL); - if (rc != 0) - goto failed_1; + rc = class_register_type(&echo_obd_ops, NULL, true, NULL, +#ifndef HAVE_ONLY_PROCFS_SEQ + NULL, +#endif + LUSTRE_ECHO_NAME, NULL); + if (rc != 0) + goto failed_1; # endif rc = echo_client_init();