X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdecho%2Fecho_client.c;h=a8f5b0bb128e21e0c241cd1f95489b59b4580c62;hp=5239db56349c602a560e77d79f9d435aecd234ca;hb=3bffa4d32bc5b0bc71ba6873e262ddbca436bae1;hpb=5165cdd4b063d523e5ae261f47818b5ba2bbc7cc diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 5239db5..a8f5b0b 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -50,6 +50,7 @@ #include #include #include +#include #include "echo_internal.h" @@ -87,7 +88,7 @@ struct echo_object_conf { struct echo_page { struct cl_page_slice ep_cl; - cfs_mutex_t ep_lock; + struct mutex ep_lock; cfs_page_t *ep_vmpage; }; @@ -208,6 +209,9 @@ struct echo_thread_info { struct lov_user_md_v3 eti_lum; struct md_attr eti_ma; struct lu_name eti_lname; + /* per-thread values, can be re-used */ + void *eti_big_lmm; + int eti_big_lmmsize; char eti_name[20]; struct lu_buf eti_buf; char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE]; @@ -218,7 +222,6 @@ struct echo_session_info { unsigned long dummy; }; -static cfs_mem_cache_t *echo_page_kmem; static cfs_mem_cache_t *echo_lock_kmem; static cfs_mem_cache_t *echo_object_kmem; static cfs_mem_cache_t *echo_thread_kmem; @@ -227,11 +230,6 @@ static cfs_mem_cache_t *echo_session_kmem; static struct lu_kmem_descr echo_caches[] = { { - .ckd_cache = &echo_page_kmem, - .ckd_name = "echo_page_kmem", - .ckd_size = sizeof (struct echo_page) - }, - { .ckd_cache = &echo_lock_kmem, .ckd_name = "echo_lock_kmem", .ckd_size = sizeof (struct echo_lock) @@ -282,8 +280,8 @@ static int echo_page_own(const struct lu_env *env, struct echo_page *ep = cl2echo_page(slice); if (!nonblock) - cfs_mutex_lock(&ep->ep_lock); - else if (!cfs_mutex_trylock(&ep->ep_lock)) + mutex_lock(&ep->ep_lock); + else if (!mutex_trylock(&ep->ep_lock)) return -EAGAIN; return 0; } @@ -294,8 +292,8 @@ static void echo_page_disown(const struct lu_env *env, { struct echo_page *ep = cl2echo_page(slice); - LASSERT(cfs_mutex_is_locked(&ep->ep_lock)); - cfs_mutex_unlock(&ep->ep_lock); + LASSERT(mutex_is_locked(&ep->ep_lock)); + mutex_unlock(&ep->ep_lock); } static void echo_page_discard(const struct lu_env *env, @@ -308,7 +306,7 @@ static void echo_page_discard(const struct lu_env *env, static int echo_page_is_vmlocked(const struct lu_env *env, const struct cl_page_slice *slice) { - if (cfs_mutex_is_locked(&cl2echo_page(slice)->ep_lock)) + if (mutex_is_locked(&cl2echo_page(slice)->ep_lock)) return -EBUSY; return -ENODATA; } @@ -330,7 +328,6 @@ static void echo_page_fini(const struct lu_env *env, cfs_atomic_dec(&eco->eo_npages); page_cache_release(vmpage); - OBD_SLAB_FREE_PTR(ep, echo_page_kmem); EXIT; } @@ -348,7 +345,7 @@ static int echo_page_print(const struct lu_env *env, struct echo_page *ep = cl2echo_page(slice); (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n", - ep, cfs_mutex_is_locked(&ep->ep_lock), ep->ep_vmpage); + ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage); return 0; } @@ -418,23 +415,19 @@ static struct cl_lock_operations echo_lock_ops = { * * @{ */ -static struct cl_page *echo_page_init(const struct lu_env *env, - struct cl_object *obj, - struct cl_page *page, cfs_page_t *vmpage) +static int echo_page_init(const struct lu_env *env, struct cl_object *obj, + struct cl_page *page, cfs_page_t *vmpage) { - struct echo_page *ep; + struct echo_page *ep = cl_object_page_slice(obj, page); + struct echo_object *eco = cl2echo_obj(obj); ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(ep, echo_page_kmem, CFS_ALLOC_IO); - if (ep != NULL) { - struct echo_object *eco = cl2echo_obj(obj); - ep->ep_vmpage = vmpage; - page_cache_get(vmpage); - cfs_mutex_init(&ep->ep_lock); - cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops); - cfs_atomic_inc(&eco->eo_npages); - } - RETURN(ERR_PTR(ep ? 0 : -ENOMEM)); + ep->ep_vmpage = vmpage; + page_cache_get(vmpage); + mutex_init(&ep->ep_lock); + cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops); + cfs_atomic_inc(&eco->eo_npages); + RETURN(0); } static int echo_io_init(const struct lu_env *env, struct cl_object *obj, @@ -514,12 +507,63 @@ static int echo_object_init(const struct lu_env *env, struct lu_object *obj, eco->eo_dev = ed; cfs_atomic_set(&eco->eo_npages, 0); + cl_object_page_init(lu2cl(obj), sizeof(struct echo_page)); - cfs_spin_lock(&ec->ec_lock); - cfs_list_add_tail(&eco->eo_obj_chain, &ec->ec_objects); - cfs_spin_unlock(&ec->ec_lock); + spin_lock(&ec->ec_lock); + cfs_list_add_tail(&eco->eo_obj_chain, &ec->ec_objects); + spin_unlock(&ec->ec_lock); - RETURN(0); + RETURN(0); +} + +/* taken from osc_unpackmd() */ +static int echo_alloc_memmd(struct echo_device *ed, + struct lov_stripe_md **lsmp) +{ + int lsm_size; + + ENTRY; + + /* If export is lov/osc then use their obd method */ + if (ed->ed_next != NULL) + return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp); + /* OFD has no unpackmd method, do everything here */ + lsm_size = lov_stripe_md_size(1); + + LASSERT(*lsmp == NULL); + OBD_ALLOC(*lsmp, lsm_size); + if (*lsmp == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + if ((*lsmp)->lsm_oinfo[0] == NULL) { + OBD_FREE(*lsmp, lsm_size); + RETURN(-ENOMEM); + } + + loi_init((*lsmp)->lsm_oinfo[0]); + (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + + RETURN(lsm_size); +} + +static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp) +{ + int lsm_size; + + ENTRY; + + /* If export is lov/osc then use their obd method */ + if (ed->ed_next != NULL) + return obd_free_memmd(ed->ed_ec->ec_exp, lsmp); + /* OFD has no unpackmd method, do everything here */ + lsm_size = lov_stripe_md_size(1); + + LASSERT(*lsmp != NULL); + OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + OBD_FREE(*lsmp, lsm_size); + *lsmp = NULL; + RETURN(0); } static void echo_object_free(const struct lu_env *env, struct lu_object *obj) @@ -530,15 +574,15 @@ static void echo_object_free(const struct lu_env *env, struct lu_object *obj) LASSERT(cfs_atomic_read(&eco->eo_npages) == 0); - cfs_spin_lock(&ec->ec_lock); + spin_lock(&ec->ec_lock); cfs_list_del_init(&eco->eo_obj_chain); - cfs_spin_unlock(&ec->ec_lock); + spin_unlock(&ec->ec_lock); lu_object_fini(obj); lu_object_header_fini(obj->lo_header); if (eco->eo_lsm) - obd_free_memmd(ec->ec_exp, &eco->eo_lsm); + echo_free_memmd(eco->eo_dev, &eco->eo_lsm); OBD_SLAB_FREE_PTR(eco, echo_object_kmem); EXIT; } @@ -700,7 +744,7 @@ LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key); #define ECHO_SEQ_WIDTH 0xffffffff static int echo_fid_init(struct echo_device *ed, char *obd_name, - struct md_site *ms) + struct seq_server_site *ss) { char *prefix; int rc; @@ -716,10 +760,10 @@ static int echo_fid_init(struct echo_device *ed, char *obd_name, snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name); - /* Init client side sequence-manager */ - rc = seq_client_init(ed->ed_cl_seq, NULL, - LUSTRE_SEQ_METADATA, - prefix, ms->ms_server_seq); + /* Init client side sequence-manager */ + rc = seq_client_init(ed->ed_cl_seq, NULL, + LUSTRE_SEQ_METADATA, + prefix, ss->ss_server_seq); ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH; OBD_FREE(prefix, MAX_OBD_NAME + 5); if (rc) @@ -826,14 +870,14 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env, ls = next->ld_site; - cfs_spin_lock(&ls->ls_ld_lock); - cfs_list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) { - if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) { - found = 1; - break; - } - } - cfs_spin_unlock(&ls->ls_ld_lock); + spin_lock(&ls->ls_ld_lock); + cfs_list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) { + if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) { + found = 1; + break; + } + } + spin_unlock(&ls->ls_ld_lock); if (found == 0) { CERROR("%s is not lu device type!\n", @@ -846,11 +890,11 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env, ed->ed_site_myself.cs_lu = *ls; ed->ed_site = &ed->ed_site_myself; ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu; - rc = echo_fid_init(ed, obd->obd_name, lu_site2md(ls)); - if (rc) { - CERROR("echo fid init error %d\n", rc); - GOTO(out, rc); - } + rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls)); + if (rc) { + CERROR("echo fid init error %d\n", rc); + GOTO(out, rc); + } } else { /* if echo client is to be stacked upon ost device, the next is * NULL since ost is not a clio device so far */ @@ -961,29 +1005,29 @@ static struct lu_device *echo_device_free(const struct lu_env *env, * all of cached objects. Anyway, probably the echo device is being * parallelly accessed. */ - cfs_spin_lock(&ec->ec_lock); - cfs_list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain) - eco->eo_deleted = 1; - cfs_spin_unlock(&ec->ec_lock); - - /* purge again */ - lu_site_purge(env, &ed->ed_site->cs_lu, -1); - - CDEBUG(D_INFO, - "Waiting for the reference of echo object to be dropped\n"); - - /* Wait for the last reference to be dropped. */ - cfs_spin_lock(&ec->ec_lock); - while (!cfs_list_empty(&ec->ec_objects)) { - cfs_spin_unlock(&ec->ec_lock); - CERROR("echo_client still has objects at cleanup time, " - "wait for 1 second\n"); - cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT, - cfs_time_seconds(1)); - lu_site_purge(env, &ed->ed_site->cs_lu, -1); - cfs_spin_lock(&ec->ec_lock); - } - cfs_spin_unlock(&ec->ec_lock); + spin_lock(&ec->ec_lock); + cfs_list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain) + eco->eo_deleted = 1; + spin_unlock(&ec->ec_lock); + + /* purge again */ + lu_site_purge(env, &ed->ed_site->cs_lu, -1); + + CDEBUG(D_INFO, + "Waiting for the reference of echo object to be dropped\n"); + + /* Wait for the last reference to be dropped. */ + spin_lock(&ec->ec_lock); + while (!cfs_list_empty(&ec->ec_objects)) { + spin_unlock(&ec->ec_lock); + CERROR("echo_client still has objects at cleanup time, " + "wait for 1 second\n"); + cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT, + cfs_time_seconds(1)); + lu_site_purge(env, &ed->ed_site->cs_lu, -1); + spin_lock(&ec->ec_lock); + } + spin_unlock(&ec->ec_lock); LASSERT(cfs_list_empty(&ec->ec_locks)); @@ -1112,7 +1156,7 @@ static int cl_echo_object_put(struct echo_object *eco) if (eco->eo_deleted) { struct lu_object_header *loh = obj->co_lu.lo_header; LASSERT(&eco->eo_hdr == luh2coh(loh)); - cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags); + set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags); } cl_object_put(env, obj); @@ -1152,18 +1196,19 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco, rc = cl_wait(env, lck); if (rc == 0) { el = cl2echo_lock(cl_lock_at(lck, &echo_device_type)); - cfs_spin_lock(&ec->ec_lock); - if (cfs_list_empty(&el->el_chain)) { - cfs_list_add(&el->el_chain, &ec->ec_locks); - el->el_cookie = ++ec->ec_unique; - } - cfs_atomic_inc(&el->el_refcount); - *cookie = el->el_cookie; - cfs_spin_unlock(&ec->ec_lock); - } else - cl_lock_release(env, lck, "ec enqueue", cfs_current()); - } - RETURN(rc); + spin_lock(&ec->ec_lock); + if (cfs_list_empty(&el->el_chain)) { + cfs_list_add(&el->el_chain, &ec->ec_locks); + el->el_cookie = ++ec->ec_unique; + } + cfs_atomic_inc(&el->el_refcount); + *cookie = el->el_cookie; + spin_unlock(&ec->ec_lock); + } else { + cl_lock_release(env, lck, "ec enqueue", cfs_current()); + } + } + RETURN(rc); } static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end, @@ -1208,7 +1253,7 @@ static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed, ENTRY; LASSERT(ec != NULL); - cfs_spin_lock (&ec->ec_lock); + spin_lock(&ec->ec_lock); cfs_list_for_each (el, &ec->ec_locks) { ecl = cfs_list_entry (el, struct echo_lock, el_chain); CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie); @@ -1221,7 +1266,7 @@ static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed, break; } } - cfs_spin_unlock (&ec->ec_lock); + spin_unlock(&ec->ec_lock); if (!found) RETURN(-ENOENT); @@ -1422,6 +1467,120 @@ static inline void echo_md_build_name(struct lu_name *lname, char *name, lname->ln_namelen = strlen(name); } +/* similar to mdt_attr_get_complex */ +static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o, + struct md_attr *ma) +{ + struct echo_thread_info *info = echo_env_info(env); + int rc; + + ENTRY; + + LASSERT(ma->ma_lmm_size > 0); + + rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + /* big_lmm may need to be grown */ + if (info->eti_big_lmmsize < rc) { + int size = size_roundup_power2(rc); + + if (info->eti_big_lmmsize > 0) { + /* free old buffer */ + LASSERT(info->eti_big_lmm); + OBD_FREE_LARGE(info->eti_big_lmm, + info->eti_big_lmmsize); + info->eti_big_lmm = NULL; + info->eti_big_lmmsize = 0; + } + + OBD_ALLOC_LARGE(info->eti_big_lmm, size); + if (info->eti_big_lmm == NULL) + RETURN(-ENOMEM); + info->eti_big_lmmsize = size; + } + LASSERT(info->eti_big_lmmsize >= rc); + + info->eti_buf.lb_buf = info->eti_big_lmm; + info->eti_buf.lb_len = info->eti_big_lmmsize; + rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + ma->ma_valid |= MA_LOV; + ma->ma_lmm = info->eti_big_lmm; + ma->ma_lmm_size = rc; + + RETURN(0); +} + +int echo_attr_get_complex(const struct lu_env *env, struct md_object *next, + struct md_attr *ma) +{ + struct echo_thread_info *info = echo_env_info(env); + struct lu_buf *buf = &info->eti_buf; + cfs_umode_t mode = lu_object_attr(&next->mo_lu); + int need = ma->ma_need; + int rc = 0, rc2; + + ENTRY; + + ma->ma_valid = 0; + + if (need & MA_INODE) { + ma->ma_need = MA_INODE; + rc = mo_attr_get(env, next, ma); + if (rc) + GOTO(out, rc); + ma->ma_valid |= MA_INODE; + } + + if (need & MA_LOV) { + if (S_ISREG(mode) || S_ISDIR(mode)) { + LASSERT(ma->ma_lmm_size > 0); + buf->lb_buf = ma->ma_lmm; + buf->lb_len = ma->ma_lmm_size; + rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV); + if (rc2 > 0) { + ma->ma_lmm_size = rc2; + ma->ma_valid |= MA_LOV; + } else if (rc2 == -ENODATA) { + /* no LOV EA */ + ma->ma_lmm_size = 0; + } else if (rc2 == -ERANGE) { + rc2 = echo_big_lmm_get(env, next, ma); + if (rc2 < 0) + GOTO(out, rc = rc2); + } else { + GOTO(out, rc = rc2); + } + } + } + +#ifdef CONFIG_FS_POSIX_ACL + if (need & MA_ACL_DEF && S_ISDIR(mode)) { + buf->lb_buf = ma->ma_acl; + buf->lb_len = ma->ma_acl_size; + rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT); + if (rc2 > 0) { + ma->ma_acl_size = rc2; + ma->ma_valid |= MA_ACL_DEF; + } else if (rc2 == -ENODATA) { + /* no ACLs */ + ma->ma_acl_size = 0; + } else { + GOTO(out, rc = rc2); + } + } +#endif +out: + ma->ma_need = need; + CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n", + rc, ma->ma_valid, ma->ma_lmm); + RETURN(rc); +} + static int echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, struct md_object *parent, struct lu_fid *fid, @@ -1435,6 +1594,8 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; int rc; + ENTRY; + rc = mdo_lookup(env, parent, lname, fid2, spec); if (rc == 0) return -EEXIST; @@ -1446,7 +1607,7 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, if (IS_ERR(ec_child)) { CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid), PTR_ERR(ec_child)); - return PTR_ERR(ec_child); + RETURN(PTR_ERR(ec_child)); } child = lu_object_locate(ec_child->lo_header, ld->ld_type); @@ -1469,45 +1630,27 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, } CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc = %d\n", PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc); + EXIT; out_put: lu_object_put(env, ec_child); return rc; } -static int echo_set_lmm_size(const struct lu_env *env, - struct lu_device *ld, - struct md_attr *ma, int *lmm_size, - int *cookie_size) +static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld, + struct md_attr *ma) { - struct md_device *md = lu2md_dev(ld); - int rc; - ENTRY; - - md = lu2md_dev(ld); - rc = md->md_ops->mdo_maxsize_get(env, md, - lmm_size, cookie_size); - if (rc) - RETURN(rc); - - ma->ma_lmm_size = *lmm_size; - if (*lmm_size > 0) { - OBD_ALLOC(ma->ma_lmm, *lmm_size); - if (ma->ma_lmm == NULL) { - ma->ma_lmm_size = 0; - RETURN(-ENOMEM); - } - } + struct echo_thread_info *info = echo_env_info(env); - ma->ma_cookie_size = *cookie_size; - if (*cookie_size > 0) { - OBD_ALLOC(ma->ma_cookie, *cookie_size); - if (ma->ma_cookie == NULL) { - ma->ma_cookie_size = 0; - RETURN(-ENOMEM); - } + if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) { + ma->ma_lmm = (void *)&info->eti_lmm; + ma->ma_lmm_size = sizeof(info->eti_lmm); + } else { + LASSERT(info->eti_big_lmmsize); + ma->ma_lmm = info->eti_big_lmm; + ma->ma_lmm_size = info->eti_big_lmmsize; } - RETURN(0); + return 0; } static int echo_create_md_object(const struct lu_env *env, @@ -1525,25 +1668,24 @@ static int echo_create_md_object(const struct lu_env *env, struct md_attr *ma = &info->eti_ma; struct lu_device *ld = ed->ed_next; int rc = 0; - int lmm_size = 0; - int cookie_size = 0; int i; + ENTRY; + + if (ec_parent == NULL) + return -1; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - RETURN(PTR_ERR(parent)); - } + if (parent == NULL) + RETURN(-ENXIO); memset(ma, 0, sizeof(*ma)); memset(spec, 0, sizeof(*spec)); if (stripe_count != 0) { spec->sp_cr_flags |= FMODE_WRITE; - rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size); - if (rc) - GOTO(out_free, rc); + echo_set_lmm_size(env, ld, ma); if (stripe_count != -1) { struct lov_user_md_v3 *lum = &info->eti_lum; + lum->lmm_magic = LOV_USER_MAGIC_V3; lum->lmm_stripe_count = stripe_count; lum->lmm_stripe_offset = stripe_offset; @@ -1555,7 +1697,7 @@ static int echo_create_md_object(const struct lu_env *env, } ma->ma_attr.la_mode = mode; - ma->ma_attr.la_valid = LA_CTIME; + ma->ma_attr.la_valid = LA_CTIME | LA_MODE; ma->ma_attr.la_ctime = cfs_time_current_64(); if (name != NULL) { @@ -1564,7 +1706,7 @@ static int echo_create_md_object(const struct lu_env *env, /* If name is specified, only create one object by name */ rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname, spec, ma); - GOTO(out_free, rc); + RETURN(rc); } /* Create multiple object sequenced by id */ @@ -1584,13 +1726,7 @@ static int echo_create_md_object(const struct lu_env *env, fid->f_oid++; } -out_free: - if (lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, lmm_size); - if (cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, cookie_size); - - return rc; + RETURN(rc); } static struct lu_object *echo_md_lookup(const struct lu_env *env, @@ -1631,14 +1767,14 @@ static int echo_setattr_object(const struct lu_env *env, int rc = 0; int i; + ENTRY; + + if (ec_parent == NULL) + return -1; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + RETURN(-ENXIO); - buf->lb_buf = info->eti_xattr_buf; - buf->lb_len = sizeof(info->eti_xattr_buf); for (i = 0; i < count; i++) { struct lu_object *ec_child, *child; @@ -1662,10 +1798,13 @@ static int echo_setattr_object(const struct lu_env *env, CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n", PFID(lu_object_fid(child))); + buf->lb_buf = info->eti_xattr_buf; + buf->lb_len = sizeof(info->eti_xattr_buf); + sprintf(name, "%s.test1", XATTR_USER_PREFIX); rc = mo_xattr_set(env, lu2md(child), buf, name, LU_XATTR_CREATE); - if (rc) { + if (rc < 0) { CERROR("Can not setattr child "DFID": rc = %d\n", PFID(lu_object_fid(child)), rc); lu_object_put(env, ec_child); @@ -1676,7 +1815,7 @@ static int echo_setattr_object(const struct lu_env *env, id++; lu_object_put(env, ec_child); } - return rc; + RETURN(rc); } static int echo_getattr_object(const struct lu_env *env, @@ -1691,21 +1830,17 @@ static int echo_getattr_object(const struct lu_env *env, struct md_attr *ma = &info->eti_ma; struct lu_device *ld = ed->ed_next; int rc = 0; - int lmm_size = 0; - int cookie_size = 0; int i; + ENTRY; + + if (ec_parent == NULL) + return -1; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + RETURN(-ENXIO); memset(ma, 0, sizeof(*ma)); - rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size); - if (rc) - GOTO(out_free, rc); - ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF; ma->ma_acl = info->eti_xattr_buf; ma->ma_acl_size = sizeof(info->eti_xattr_buf); @@ -1715,6 +1850,7 @@ static int echo_getattr_object(const struct lu_env *env, ma->ma_valid = 0; echo_md_build_name(lname, name, id); + echo_set_lmm_size(env, ld, ma); ec_child = echo_md_lookup(env, ed, lu2md(parent), lname); if (IS_ERR(ec_child)) { @@ -1727,12 +1863,12 @@ static int echo_getattr_object(const struct lu_env *env, if (child == NULL) { CERROR("Can not locate the child %s\n", lname->ln_name); lu_object_put(env, ec_child); - GOTO(out_free, rc = -EINVAL); + RETURN(-EINVAL); } CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n", PFID(lu_object_fid(child))); - rc = mo_attr_get(env, lu2md(child), ma); + rc = echo_attr_get_complex(env, lu2md(child), ma); if (rc) { CERROR("Can not getattr child "DFID": rc = %d\n", PFID(lu_object_fid(child)), rc); @@ -1745,12 +1881,7 @@ static int echo_getattr_object(const struct lu_env *env, lu_object_put(env, ec_child); } -out_free: - if (lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, lmm_size); - if (cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, cookie_size); - return rc; + RETURN(rc); } static int echo_lookup_object(const struct lu_env *env, @@ -1767,11 +1898,11 @@ static int echo_lookup_object(const struct lu_env *env, int rc = 0; int i; + if (ec_parent == NULL) + return -1; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + return -ENXIO; /*prepare the requests*/ for (i = 0; i < count; i++) { @@ -1804,6 +1935,8 @@ static int echo_md_destroy_internal(const struct lu_env *env, struct lu_object *child; int rc; + ENTRY; + ec_child = echo_md_lookup(env, ed, parent, lname); if (IS_ERR(ec_child)) { CERROR("Can't find child %s: rc = %ld\n", lname->ln_name, @@ -1820,14 +1953,6 @@ static int echo_md_destroy_internal(const struct lu_env *env, CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n", PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent); -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0) - /* After 2.4, MDT will send destroy RPC to OST directly, so no need - * this flag */ - ma->ma_valid |= MA_FLAGS; - ma->ma_attr_flags |= MDS_UNLINK_DESTROY; -#else -#warning "Please remove this after 2.4 (LOD/OSP)" -#endif rc = mdo_unlink(env, parent, lu2md(child), lname, ma); if (rc) { CERROR("Can not unlink child %s: rc = %d\n", @@ -1854,8 +1979,6 @@ static int echo_destroy_object(const struct lu_env *env, struct lu_device *ld = ed->ed_next; struct lu_object *parent; int rc = 0; - int lmm_size = 0; - int cookie_size = 0; int i; ENTRY; @@ -1870,22 +1993,18 @@ static int echo_destroy_object(const struct lu_env *env, ma->ma_need = MA_INODE; ma->ma_valid = 0; - rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size); - if (rc) - GOTO(out_free, rc); if (name != NULL) { lname->ln_name = name; lname->ln_namelen = namelen; rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname, ma); - GOTO(out_free, rc); + RETURN(rc); } /*prepare the requests*/ for (i = 0; i < count; i++) { char *tmp_name = info->eti_name; - ma->ma_need |= MA_LOV; ma->ma_valid = 0; echo_md_build_name(lname, tmp_name, id); @@ -1898,11 +2017,6 @@ static int echo_destroy_object(const struct lu_env *env, id++; } -out_free: - if (lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, lmm_size); - if (cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, cookie_size); RETURN(rc); } @@ -1975,12 +2089,40 @@ static struct lu_object *echo_resolve_path(const struct lu_env *env, RETURN(parent); } +static void echo_ucred_init(struct lu_env *env) +{ + struct lu_ucred *ucred = lu_ucred(env); + + ucred->uc_valid = UCRED_INVALID; + + ucred->uc_suppgids[0] = -1; + ucred->uc_suppgids[1] = -1; + + ucred->uc_uid = ucred->uc_o_uid = cfs_curproc_uid(); + ucred->uc_gid = ucred->uc_o_gid = cfs_curproc_gid(); + ucred->uc_fsuid = ucred->uc_o_fsuid = cfs_curproc_fsuid(); + ucred->uc_fsgid = ucred->uc_o_fsgid = cfs_curproc_fsgid(); + ucred->uc_cap = cfs_curproc_cap_pack(); + + /* remove fs privilege for non-root user. */ + if (ucred->uc_fsuid) + ucred->uc_cap &= ~CFS_CAP_FS_MASK; + ucred->uc_valid = UCRED_NEW; +} + +static void echo_ucred_fini(struct lu_env *env) +{ + struct lu_ucred *ucred = lu_ucred(env); + ucred->uc_valid = UCRED_INIT; +} + #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD) #define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION) static int echo_md_handler(struct echo_device *ed, int command, char *path, int path_len, int id, int count, struct obd_ioctl_data *data) { + struct echo_thread_info *info; struct lu_device *ld = ed->ed_next; struct lu_env *env; int refcheck; @@ -2005,29 +2147,34 @@ static int echo_md_handler(struct echo_device *ed, int command, RETURN(PTR_ERR(env)); rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG); - if (rc != 0) { - cl_env_put(env, &refcheck); - RETURN(rc); - } + if (rc != 0) + GOTO(out_env, rc); + + /* init big_lmm buffer */ + info = echo_env_info(env); + LASSERT(info->eti_big_lmm == NULL); + OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE); + if (info->eti_big_lmm == NULL) + GOTO(out_env, rc = -ENOMEM); + info->eti_big_lmmsize = MIN_MD_SIZE; parent = echo_resolve_path(env, ed, path, path_len); if (IS_ERR(parent)) { CERROR("Can not resolve the path %s: rc = %ld\n", path, PTR_ERR(parent)); - cl_env_put(env, &refcheck); - RETURN(PTR_ERR(parent)); + GOTO(out_free, rc = PTR_ERR(parent)); } if (namelen > 0) { OBD_ALLOC(name, namelen + 1); if (name == NULL) - RETURN(-ENOMEM); - if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen)) { - OBD_FREE(name, namelen + 1); - RETURN(-EFAULT); - } + GOTO(out_put, rc = -ENOMEM); + if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen)) + GOTO(out_name, rc = -EFAULT); } + echo_ucred_init(env); + switch (command) { case ECHO_MD_CREATE: case ECHO_MD_MKDIR: { @@ -2070,9 +2217,19 @@ static int echo_md_handler(struct echo_device *ed, int command, rc = -EINVAL; break; } + echo_ucred_fini(env); + +out_name: if (name != NULL) OBD_FREE(name, namelen + 1); +out_put: lu_object_put(env, parent); +out_free: + LASSERT(info->eti_big_lmm); + OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize); + info->eti_big_lmm = NULL; + info->eti_big_lmmsize = 0; +out_env: cl_env_put(env, &refcheck); return rc; } @@ -2095,7 +2252,7 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, RETURN(-EINVAL); } - rc = obd_alloc_memmd(ec->ec_exp, &lsm); + rc = echo_alloc_memmd(ed, &lsm); if (rc < 0) { CERROR("Cannot allocate md: rc = %d\n", rc); GOTO(failed, rc); @@ -2162,7 +2319,7 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, if (created && rc) obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL); if (lsm) - obd_free_memmd(ec->ec_exp, &lsm); + echo_free_memmd(ed, &lsm); if (rc) CERROR("create object failed with: rc = %d\n", rc); return (rc); @@ -2171,7 +2328,6 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, struct obdo *oa) { - struct echo_client_obd *ec = ed->ed_ec; struct lov_stripe_md *lsm = NULL; struct echo_object *eco; int rc; @@ -2184,7 +2340,7 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, RETURN(-EINVAL); } - rc = obd_alloc_memmd(ec->ec_exp, &lsm); + rc = echo_alloc_memmd(ed, &lsm); if (rc < 0) RETURN(rc); @@ -2201,7 +2357,7 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, else rc = PTR_ERR(eco); if (lsm) - obd_free_memmd(ec->ec_exp, &lsm); + echo_free_memmd(ed, &lsm); RETURN(rc); } @@ -2400,11 +2556,12 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, RETURN(rc); } -static int echo_client_prep_commit(struct obd_export *exp, int rw, - struct obdo *oa, struct echo_object *eco, - obd_off offset, obd_size count, - obd_size batch, struct obd_trans_info *oti, - int async) +static int echo_client_prep_commit(const struct lu_env *env, + struct obd_export *exp, int rw, + struct obdo *oa, struct echo_object *eco, + obd_off offset, obd_size count, + obd_size batch, struct obd_trans_info *oti, + int async) { struct lov_stripe_md *lsm = eco->eo_lsm; struct obd_ioobj ioo; @@ -2412,7 +2569,8 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, struct niobuf_remote *rnb; obd_off off; obd_size npages, tot_pages; - int i, ret = 0; + int i, ret = 0, brw_flags = 0; + ENTRY; if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 || @@ -2428,6 +2586,9 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, if (lnb == NULL || rnb == NULL) GOTO(out, ret = -ENOMEM); + if (rw == OBD_BRW_WRITE && async) + brw_flags |= OBD_BRW_ASYNC; + obdo_to_ioobj(oa, &ioo); off = offset; @@ -2441,13 +2602,14 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) { rnb[i].offset = off; rnb[i].len = CFS_PAGE_SIZE; + rnb[i].flags = brw_flags; } ioo.ioo_bufcnt = npages; oti->oti_transno = 0; lpages = npages; - ret = obd_preprw(NULL, rw, exp, oa, 1, &ioo, rnb, &lpages, + ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti, NULL); if (ret != 0) GOTO(out, ret); @@ -2480,8 +2642,8 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, rnb[i].len); } - ret = obd_commitrw(NULL, rw, exp, oa, 1, &ioo, - rnb, npages, lnb, oti, ret); + ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, + rnb, npages, lnb, oti, ret); if (ret != 0) GOTO(out, ret); @@ -2497,13 +2659,14 @@ out: RETURN(ret); } -static int echo_client_brw_ioctl(int rw, struct obd_export *exp, - struct obd_ioctl_data *data) +static int echo_client_brw_ioctl(const struct lu_env *env, int rw, + struct obd_export *exp, + struct obd_ioctl_data *data, + struct obd_trans_info *dummy_oti) { struct obd_device *obd = class_exp2obd(exp); struct echo_device *ed = obd2echo_dev(obd); struct echo_client_obd *ec = ed->ed_ec; - struct obd_trans_info dummy_oti = { 0 }; struct obdo *oa = &data->ioc_obdo1; struct echo_object *eco; int rc; @@ -2519,7 +2682,7 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, oa->o_valid &= ~OBD_MD_FLHANDLE; - /* obdfilter doesn't support obd_brw now, simulate via prep + commit */ + /* OFD/obdfilter works only via prep/commit */ test_mode = (long)data->ioc_pbuf1; if (test_mode == 1) async = 0; @@ -2539,13 +2702,13 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, case 2: rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset, - data->ioc_count, async, &dummy_oti); + data->ioc_count, async, dummy_oti); break; case 3: - rc = echo_client_prep_commit(ec->ec_exp, rw, oa, - eco, data->ioc_offset, - data->ioc_count, data->ioc_plen1, - &dummy_oti, async); + rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, + eco, data->ioc_offset, + data->ioc_count, data->ioc_plen1, + dummy_oti, async); break; default: rc = -EINVAL; @@ -2711,11 +2874,11 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, if (cfs_copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1)) return -EFAULT; - max_count = LUSTRE_SEQ_MAX_WIDTH; - if (cfs_copy_to_user(data->ioc_pbuf2, &max_count, - data->ioc_plen2)) - return -EFAULT; - GOTO(out, rc); + max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH; + if (cfs_copy_to_user(data->ioc_pbuf2, &max_count, + data->ioc_plen2)) + return -EFAULT; + GOTO(out, rc); } case OBD_IOC_DESTROY: if (!cfs_capable(CFS_CAP_SYS_ADMIN)) @@ -2764,7 +2927,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rw = OBD_BRW_WRITE; /* fall through */ case OBD_IOC_BRW_READ: - rc = echo_client_brw_ioctl(rw, exp, data); + rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti); GOTO(out, rc); case ECHO_IOC_GET_STRIPE: @@ -2850,7 +3013,7 @@ static int echo_client_setup(const struct lu_env *env, RETURN(-EINVAL); } - cfs_spin_lock_init (&ec->ec_lock); + spin_lock_init(&ec->ec_lock); CFS_INIT_LIST_HEAD (&ec->ec_objects); CFS_INIT_LIST_HEAD (&ec->ec_locks); ec->ec_unique = 0; @@ -2871,16 +3034,16 @@ static int echo_client_setup(const struct lu_env *env, ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 | - OBD_CONNECT_64BITHASH; + OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE; ocd->ocd_version = LUSTRE_VERSION_CODE; ocd->ocd_group = FID_SEQ_ECHO; rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL); if (rc == 0) { /* Turn off pinger because it connects to tgt obd directly. */ - cfs_spin_lock(&tgt->obd_dev_lock); - cfs_list_del_init(&ec->ec_exp->exp_obd_chain_timed); - cfs_spin_unlock(&tgt->obd_dev_lock); + spin_lock(&tgt->obd_dev_lock); + cfs_list_del_init(&ec->ec_exp->exp_obd_chain_timed); + spin_unlock(&tgt->obd_dev_lock); } OBD_FREE(ocd, sizeof(*ocd));