X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdecho%2Fecho_client.c;h=41e6214005965e300701e8c9d56eaa8597ed097b;hp=e6471c87b2dc848e4e689707542fe391213a71de;hb=a08b190a278e952e1c303eb49233b542189656e6;hpb=9f516c2214db4dc515ac817ec3379a9765202d3e diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index e6471c8..41e6214 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -1,399 +1,1418 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * GPL HEADER START * - * This file is part of Lustre, http://www.lustre.org. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ #define DEBUG_SUBSYSTEM S_ECHO #ifdef __KERNEL__ -#include -#include -#include -#include -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) -#include -#endif -#include +#include #else #include -#include #endif -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include -static obd_id last_object_id; +#include "echo_internal.h" + +/** \defgroup echo_client Echo Client + * @{ + */ + +struct echo_device { + struct cl_device ed_cl; + struct echo_client_obd *ed_ec; + + struct cl_site ed_site_myself; + struct cl_site *ed_site; + struct lu_device *ed_next; + int ed_next_islov; +}; + +struct echo_object { + struct cl_object eo_cl; + struct cl_object_header eo_hdr; + + struct echo_device *eo_dev; + cfs_list_t eo_obj_chain; + struct lov_stripe_md *eo_lsm; + cfs_atomic_t eo_npages; + int eo_deleted; +}; + +struct echo_object_conf { + struct cl_object_conf eoc_cl; + struct lov_stripe_md **eoc_md; +}; + +struct echo_page { + struct cl_page_slice ep_cl; + cfs_mutex_t ep_lock; + cfs_page_t *ep_vmpage; +}; + +struct echo_lock { + struct cl_lock_slice el_cl; + cfs_list_t el_chain; + struct echo_object *el_object; + __u64 el_cookie; + cfs_atomic_t el_refcount; +}; + +struct echo_io { + struct cl_io_slice ei_cl; +}; #if 0 -static void -echo_printk_object (char *msg, struct ec_object *eco) +struct echo_req { + struct cl_req_slice er_cl; +}; +#endif + +static int echo_client_setup(struct obd_device *obddev, + struct lustre_cfg *lcfg); +static int echo_client_cleanup(struct obd_device *obddev); + + +/** \defgroup echo_helpers Helper functions + * @{ + */ +static inline struct echo_device *cl2echo_dev(const struct cl_device *dev) +{ + return container_of0(dev, struct echo_device, ed_cl); +} + +static inline struct cl_device *echo_dev2cl(struct echo_device *d) +{ + return &d->ed_cl; +} + +static inline struct echo_device *obd2echo_dev(const struct obd_device *obd) { - struct lov_stripe_md *lsm = eco->eco_lsm; - int i; + return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev)); +} + +static inline struct cl_object *echo_obj2cl(struct echo_object *eco) +{ + return &eco->eo_cl; +} + +static inline struct echo_object *cl2echo_obj(const struct cl_object *o) +{ + return container_of(o, struct echo_object, eo_cl); +} - printk (KERN_INFO "Lustre: %s: object %p: "LPX64", refs %d%s: "LPX64 - "=%u!%u\n", msg, eco, eco->eco_id, eco->eco_refcount, - eco->eco_deleted ? "(deleted) " : "", - lsm->lsm_object_id, lsm->lsm_stripe_size, - lsm->lsm_stripe_count); +static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s) +{ + return container_of(s, struct echo_page, ep_cl); +} + +static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s) +{ + return container_of(s, struct echo_lock, el_cl); +} + +static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl) +{ + return ecl->el_cl.cls_lock; +} + +static struct lu_context_key echo_thread_key; +static inline struct echo_thread_info *echo_env_info(const struct lu_env *env) +{ + struct echo_thread_info *info; + info = lu_context_key_get(&env->le_ctx, &echo_thread_key); + LASSERT(info != NULL); + return info; +} + +static inline +struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c) +{ + return container_of(c, struct echo_object_conf, eoc_cl); +} - for (i = 0; i < lsm->lsm_stripe_count; i++) - printk (KERN_INFO "Lustre: @%2u:"LPX64"\n", - lsm->lsm_oinfo[i].loi_ost_idx, - lsm->lsm_oinfo[i].loi_id); +static inline void lsm2fid(struct lov_stripe_md *lsm, struct lu_fid *fid) +{ + fid_zero(fid); + fid->f_seq = FID_SEQ_ECHO; + /* truncated to 32 bits by assignment */ + fid->f_oid = lsm->lsm_object_id; + fid->f_ver = lsm->lsm_object_id >> 32; } +/** @} echo_helpers */ + +static struct echo_object *cl_echo_object_find(struct echo_device *d, + struct lov_stripe_md **lsm); +static int cl_echo_object_put(struct echo_object *eco); +static int cl_echo_enqueue (struct echo_object *eco, obd_off start, + obd_off end, int mode, __u64 *cookie); +static int cl_echo_cancel (struct echo_device *d, __u64 cookie); +static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, + cfs_page_t **pages, int npages, int async); + +static struct echo_thread_info *echo_env_info(const struct lu_env *env); + +struct echo_thread_info { + struct echo_object_conf eti_conf; + struct lustre_md eti_md; + + struct cl_2queue eti_queue; + struct cl_io eti_io; + struct cl_lock_descr eti_descr; + struct lu_fid eti_fid; +}; + +/* No session used right now */ +struct echo_session_info { + unsigned long dummy; +}; + +static cfs_mem_cache_t *echo_page_kmem; +static cfs_mem_cache_t *echo_lock_kmem; +static cfs_mem_cache_t *echo_object_kmem; +static cfs_mem_cache_t *echo_thread_kmem; +static cfs_mem_cache_t *echo_session_kmem; +//static cfs_mem_cache_t *echo_req_kmem; + +static struct lu_kmem_descr echo_caches[] = { + { + .ckd_cache = &echo_page_kmem, + .ckd_name = "echo_page_kmem", + .ckd_size = sizeof (struct echo_page) + }, + { + .ckd_cache = &echo_lock_kmem, + .ckd_name = "echo_lock_kmem", + .ckd_size = sizeof (struct echo_lock) + }, + { + .ckd_cache = &echo_object_kmem, + .ckd_name = "echo_object_kmem", + .ckd_size = sizeof (struct echo_object) + }, + { + .ckd_cache = &echo_thread_kmem, + .ckd_name = "echo_thread_kmem", + .ckd_size = sizeof (struct echo_thread_info) + }, + { + .ckd_cache = &echo_session_kmem, + .ckd_name = "echo_session_kmem", + .ckd_size = sizeof (struct echo_session_info) + }, +#if 0 + { + .ckd_cache = &echo_req_kmem, + .ckd_name = "echo_req_kmem", + .ckd_size = sizeof (struct echo_req) + }, #endif + { + .ckd_cache = NULL + } +}; + +/** \defgroup echo_page Page operations + * + * Echo page operations. + * + * @{ + */ +cfs_page_t *echo_page_vmpage(const struct lu_env *env, + const struct cl_page_slice *slice) +{ + return cl2echo_page(slice)->ep_vmpage; +} + +static int echo_page_own(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *io, int nonblock) +{ + struct echo_page *ep = cl2echo_page(slice); + + if (!nonblock) + cfs_mutex_lock(&ep->ep_lock); + else if (!cfs_mutex_trylock(&ep->ep_lock)) + return -EAGAIN; + return 0; +} + +static void echo_page_disown(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *io) +{ + struct echo_page *ep = cl2echo_page(slice); + + LASSERT(cfs_mutex_is_locked(&ep->ep_lock)); + cfs_mutex_unlock(&ep->ep_lock); +} + +static void echo_page_discard(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *unused) +{ + cl_page_delete(env, slice->cpl_page); +} + +static int echo_page_is_vmlocked(const struct lu_env *env, + const struct cl_page_slice *slice) +{ + return cfs_mutex_is_locked(&cl2echo_page(slice)->ep_lock); +} + +static void echo_page_completion(const struct lu_env *env, + const struct cl_page_slice *slice, + int ioret) +{ + LASSERT(slice->cpl_page->cp_sync_io != NULL); +} + +static void echo_page_fini(const struct lu_env *env, + struct cl_page_slice *slice) +{ + struct echo_page *ep = cl2echo_page(slice); + struct echo_object *eco = cl2echo_obj(slice->cpl_obj); + cfs_page_t *vmpage = ep->ep_vmpage; + ENTRY; + + cfs_atomic_dec(&eco->eo_npages); + page_cache_release(vmpage); + OBD_SLAB_FREE_PTR(ep, echo_page_kmem); + EXIT; +} + +static int echo_page_prep(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *unused) +{ + return 0; +} + +static int echo_page_print(const struct lu_env *env, + const struct cl_page_slice *slice, + void *cookie, lu_printer_t printer) +{ + struct echo_page *ep = cl2echo_page(slice); + + (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n", + ep, cfs_mutex_is_locked(&ep->ep_lock), ep->ep_vmpage); + return 0; +} + +static const struct cl_page_operations echo_page_ops = { + .cpo_own = echo_page_own, + .cpo_disown = echo_page_disown, + .cpo_discard = echo_page_discard, + .cpo_vmpage = echo_page_vmpage, + .cpo_fini = echo_page_fini, + .cpo_print = echo_page_print, + .cpo_is_vmlocked = echo_page_is_vmlocked, + .io = { + [CRT_READ] = { + .cpo_prep = echo_page_prep, + .cpo_completion = echo_page_completion, + }, + [CRT_WRITE] = { + .cpo_prep = echo_page_prep, + .cpo_completion = echo_page_completion, + } + } +}; +/** @} echo_page */ + +/** \defgroup echo_lock Locking + * + * echo lock operations + * + * @{ + */ +static void echo_lock_fini(const struct lu_env *env, + struct cl_lock_slice *slice) +{ + struct echo_lock *ecl = cl2echo_lock(slice); + + LASSERT(cfs_list_empty(&ecl->el_chain)); + OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem); +} -static struct ec_object * -echo_find_object_locked (struct obd_device *obd, obd_id id) +static void echo_lock_delete(const struct lu_env *env, + const struct cl_lock_slice *slice) { - struct echo_client_obd *ec = &obd->u.echo_client; - struct ec_object *eco = NULL; - struct list_head *el; + struct echo_lock *ecl = cl2echo_lock(slice); + + LASSERT(cfs_list_empty(&ecl->el_chain)); +} + +static int echo_lock_fits_into(const struct lu_env *env, + const struct cl_lock_slice *slice, + const struct cl_lock_descr *need, + const struct cl_io *unused) +{ + return 1; +} + +static struct cl_lock_operations echo_lock_ops = { + .clo_fini = echo_lock_fini, + .clo_delete = echo_lock_delete, + .clo_fits_into = echo_lock_fits_into +}; - list_for_each (el, &ec->ec_objects) { - eco = list_entry (el, struct ec_object, eco_obj_chain); +/** @} echo_lock */ - if (eco->eco_id == id) - return (eco); +/** \defgroup echo_cl_ops cl_object operations + * + * operations for cl_object + * + * @{ + */ +static struct cl_page *echo_page_init(const struct lu_env *env, + struct cl_object *obj, + struct cl_page *page, cfs_page_t *vmpage) +{ + struct echo_page *ep; + ENTRY; + + OBD_SLAB_ALLOC_PTR_GFP(ep, echo_page_kmem, CFS_ALLOC_IO); + if (ep != NULL) { + struct echo_object *eco = cl2echo_obj(obj); + ep->ep_vmpage = vmpage; + page_cache_get(vmpage); + cfs_mutex_init(&ep->ep_lock); + cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops); + cfs_atomic_inc(&eco->eo_npages); } - return (NULL); + RETURN(ERR_PTR(ep ? 0 : -ENOMEM)); } -static int -echo_copyout_lsm (struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob) +static int echo_io_init(const struct lu_env *env, struct cl_object *obj, + struct cl_io *io) { - int nob; + return 0; +} - nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]); - if (nob > ulsm_nob) - return (-EINVAL); +static int echo_lock_init(const struct lu_env *env, + struct cl_object *obj, struct cl_lock *lock, + const struct cl_io *unused) +{ + struct echo_lock *el; + ENTRY; - if (copy_to_user (ulsm, lsm, nob)) - return (-EFAULT); + OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, CFS_ALLOC_IO); + if (el != NULL) { + cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops); + el->el_object = cl2echo_obj(obj); + CFS_INIT_LIST_HEAD(&el->el_chain); + cfs_atomic_set(&el->el_refcount, 0); + } + RETURN(el == NULL ? -ENOMEM : 0); +} - return (0); +static int echo_conf_set(const struct lu_env *env, struct cl_object *obj, + const struct cl_object_conf *conf) +{ + return 0; } -static int -echo_copyin_lsm (struct obd_device *obd, struct lov_stripe_md *lsm, - void *ulsm, int ulsm_nob) +static const struct cl_object_operations echo_cl_obj_ops = { + .coo_page_init = echo_page_init, + .coo_lock_init = echo_lock_init, + .coo_io_init = echo_io_init, + .coo_conf_set = echo_conf_set +}; +/** @} echo_cl_ops */ + +/** \defgroup echo_lu_ops lu_object operations + * + * operations for echo lu object. + * + * @{ + */ +static int echo_object_init(const struct lu_env *env, struct lu_object *obj, + const struct lu_object_conf *conf) { - struct echo_client_obd *ec = &obd->u.echo_client; - int nob; + const struct cl_object_conf *cconf = lu2cl_conf(conf); + struct echo_object_conf *econf = cl2echo_conf(cconf); + struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev)); + struct echo_client_obd *ec = ed->ed_ec; + struct echo_object *eco = cl2echo_obj(lu2cl(obj)); + ENTRY; - if (ulsm_nob < sizeof (*lsm)) - return (-EINVAL); + if (ed->ed_next) { + struct lu_object *below; + struct lu_device *under; - if (copy_from_user (lsm, ulsm, sizeof (*lsm))) - return (-EFAULT); + under = ed->ed_next; + below = under->ld_ops->ldo_object_alloc(env, obj->lo_header, + under); + if (below == NULL) + RETURN(-ENOMEM); + lu_object_add(obj, below); + } - nob = lsm->lsm_stripe_count * sizeof (lsm->lsm_oinfo[0]); + LASSERT(econf->eoc_md); + eco->eo_lsm = *econf->eoc_md; + eco->eo_dev = ed; + cfs_atomic_set(&eco->eo_npages, 0); - if (ulsm_nob < nob || - lsm->lsm_stripe_count > ec->ec_nstripes || - lsm->lsm_magic != LOV_MAGIC || - (lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 || - ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL)) - return (-EINVAL); + /* clear the lsm pointer so that it won't get freed. */ + *econf->eoc_md = NULL; - if (copy_from_user(lsm->lsm_oinfo, - ((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob)) - return (-EFAULT); + cfs_spin_lock(&ec->ec_lock); + cfs_list_add_tail(&eco->eo_obj_chain, &ec->ec_objects); + cfs_spin_unlock(&ec->ec_lock); - return (0); + RETURN(0); +} + +static void echo_object_free(const struct lu_env *env, struct lu_object *obj) +{ + struct echo_object *eco = cl2echo_obj(lu2cl(obj)); + struct echo_client_obd *ec = eco->eo_dev->ed_ec; + struct lov_stripe_md *lsm = eco->eo_lsm; + ENTRY; + + LASSERT(cfs_atomic_read(&eco->eo_npages) == 0); + + cfs_spin_lock(&ec->ec_lock); + cfs_list_del_init(&eco->eo_obj_chain); + cfs_spin_unlock(&ec->ec_lock); + + lu_object_fini(obj); + lu_object_header_fini(obj->lo_header); + + if (lsm) + obd_free_memmd(ec->ec_exp, &lsm); + OBD_SLAB_FREE_PTR(eco, echo_object_kmem); + EXIT; +} + +static int echo_object_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct lu_object *o) +{ + struct echo_object *obj = cl2echo_obj(lu2cl(o)); + + return (*p)(env, cookie, "echoclient-object@%p", obj); } -static struct ec_object * -echo_allocate_object (struct obd_device *obd) + +static const struct lu_object_operations echo_lu_obj_ops = { + .loo_object_init = echo_object_init, + .loo_object_delete = NULL, + .loo_object_release = NULL, + .loo_object_free = echo_object_free, + .loo_object_print = echo_object_print, + .loo_object_invariant = NULL +}; +/** @} echo_lu_ops */ + +/** \defgroup echo_lu_dev_ops lu_device operations + * + * Operations for echo lu device. + * + * @{ + */ +static struct lu_object *echo_object_alloc(const struct lu_env *env, + const struct lu_object_header *hdr, + struct lu_device *dev) +{ + struct echo_object *eco; + struct lu_object *obj = NULL; + ENTRY; + + /* we're the top dev. */ + LASSERT(hdr == NULL); + OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, CFS_ALLOC_IO); + if (eco != NULL) { + struct cl_object_header *hdr = &eco->eo_hdr; + + obj = &echo_obj2cl(eco)->co_lu; + cl_object_header_init(hdr); + lu_object_init(obj, &hdr->coh_lu, dev); + lu_object_add_top(&hdr->coh_lu, obj); + + eco->eo_cl.co_ops = &echo_cl_obj_ops; + obj->lo_ops = &echo_lu_obj_ops; + } + RETURN(obj); +} + +static struct lu_device_operations echo_device_lu_ops = { + .ldo_object_alloc = echo_object_alloc, +}; +/** @} echo_lu_dev_ops */ + +static struct cl_device_operations echo_device_cl_ops = { +}; + +/** \defgroup echo_init Setup and teardown + * + * Init and fini functions for echo client. + * + * @{ + */ +static int echo_site_init(const struct lu_env *env, struct echo_device *ed) { - struct echo_client_obd *ec = &obd->u.echo_client; - struct ec_object *eco; + struct cl_site *site = &ed->ed_site_myself; int rc; - OBD_ALLOC(eco, sizeof (*eco)); - if (eco == NULL) - return NULL; + /* initialize site */ + rc = cl_site_init(site, &ed->ed_cl); + if (rc) { + CERROR("Cannot initilize site for echo client(%d)\n", rc); + return rc; + } - rc = obd_alloc_memmd(ec->ec_exp, &eco->eco_lsm); - if (rc < 0) { - OBD_FREE(eco, sizeof (*eco)); - return NULL; + rc = lu_site_init_finish(&site->cs_lu); + if (rc) + return rc; + + ed->ed_site = site; + return 0; +} + +static void echo_site_fini(const struct lu_env *env, struct echo_device *ed) +{ + if (ed->ed_site) { + cl_site_fini(ed->ed_site); + ed->ed_site = NULL; } +} - eco->eco_device = obd; - eco->eco_deleted = 0; - eco->eco_refcount = 0; - eco->eco_lsm->lsm_magic = LOV_MAGIC; - /* leave stripe count 0 by default */ +static void *echo_thread_key_init(const struct lu_context *ctx, + struct lu_context_key *key) +{ + struct echo_thread_info *info; - return (eco); + OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, CFS_ALLOC_IO); + if (info == NULL) + info = ERR_PTR(-ENOMEM); + return info; } -static void -echo_free_object (struct ec_object *eco) +static void echo_thread_key_fini(const struct lu_context *ctx, + struct lu_context_key *key, void *data) { - struct obd_device *obd = eco->eco_device; - struct echo_client_obd *ec = &obd->u.echo_client; + struct echo_thread_info *info = data; + OBD_SLAB_FREE_PTR(info, echo_thread_kmem); +} - LASSERT (eco->eco_refcount == 0); - obd_free_memmd(ec->ec_exp, &eco->eco_lsm); - OBD_FREE (eco, sizeof (*eco)); +static void echo_thread_key_exit(const struct lu_context *ctx, + struct lu_context_key *key, void *data) +{ } -static int echo_create_object(struct obd_device *obd, int on_target, - struct obdo *oa, void *ulsm, int ulsm_nob, - struct obd_trans_info *oti) +static struct lu_context_key echo_thread_key = { + .lct_tags = LCT_CL_THREAD, + .lct_init = echo_thread_key_init, + .lct_fini = echo_thread_key_fini, + .lct_exit = echo_thread_key_exit +}; + +static void *echo_session_key_init(const struct lu_context *ctx, + struct lu_context_key *key) { - struct echo_client_obd *ec = &obd->u.echo_client; - struct ec_object *eco2; - struct ec_object *eco; - struct lov_stripe_md *lsm; - int rc; - int i, idx; + struct echo_session_info *session; - if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */ - (on_target || /* set_stripe */ - ec->ec_nstripes != 0)) { /* LOV */ - CERROR ("No valid oid\n"); - return (-EINVAL); + OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, CFS_ALLOC_IO); + if (session == NULL) + session = ERR_PTR(-ENOMEM); + return session; +} + +static void echo_session_key_fini(const struct lu_context *ctx, + struct lu_context_key *key, void *data) +{ + struct echo_session_info *session = data; + OBD_SLAB_FREE_PTR(session, echo_session_kmem); +} + +static void echo_session_key_exit(const struct lu_context *ctx, + struct lu_context_key *key, void *data) +{ +} + +static struct lu_context_key echo_session_key = { + .lct_tags = LCT_SESSION, + .lct_init = echo_session_key_init, + .lct_fini = echo_session_key_fini, + .lct_exit = echo_session_key_exit +}; + +LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key); + +static struct lu_device *echo_device_alloc(const struct lu_env *env, + struct lu_device_type *t, + struct lustre_cfg *cfg) +{ + struct lu_device *next; + struct echo_device *ed; + struct cl_device *cd; + struct obd_device *obd = NULL; /* to keep compiler happy */ + struct obd_device *tgt; + const char *tgt_type_name; + int rc; + int cleanup = 0; + ENTRY; + + OBD_ALLOC_PTR(ed); + if (ed == NULL) + GOTO(out, rc = -ENOMEM); + + cleanup = 1; + cd = &ed->ed_cl; + rc = cl_device_init(cd, t); + if (rc) + GOTO(out, rc); + + cd->cd_lu_dev.ld_ops = &echo_device_lu_ops; + cd->cd_ops = &echo_device_cl_ops; + + cleanup = 2; + rc = echo_site_init(env, ed); + if (rc) + GOTO(out, rc); + + cleanup = 3; + obd = class_name2obd(lustre_cfg_string(cfg, 0)); + LASSERT(obd != NULL); + rc = echo_client_setup(obd, cfg); + if (rc) + GOTO(out, rc); + ed->ed_ec = &obd->u.echo_client; + + cleanup = 4; + tgt = class_name2obd(lustre_cfg_string(cfg, 1)); + LASSERT(tgt != NULL); + next = tgt->obd_lu_dev; + if (next != NULL && !lu_device_is_cl(next)) + next = NULL; + + /* + * if echo client is to be stacked upon ost device, the next is NULL + * since ost is not a clio device so far + */ + tgt_type_name = tgt->obd_type->typ_name; + if (next != NULL) { + LASSERT(next != NULL); + if (next->ld_site != NULL) + GOTO(out, rc = -EBUSY); + + next->ld_site = &ed->ed_site->cs_lu; + rc = next->ld_type->ldt_ops->ldto_device_init(env, next, + next->ld_type->ldt_name, NULL); + if (rc) + GOTO(out, rc); + + /* Trikcy case, I have to determine the obd type since clio + * uses the different parameters to initialize objects for + * lov & osc. + */ + if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0) + ed->ed_next_islov = 1; + else + LASSERT(strcmp(tgt_type_name, LUSTRE_OSC_NAME) == 0); + } else + LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0); + + ed->ed_next = next; + RETURN(&cd->cd_lu_dev); + +out: + switch(cleanup) { + case 4: { + int rc2; + rc2 = echo_client_cleanup(obd); + if (rc2) + CERROR("Cleanup obd device %s error(%d)\n", + obd->obd_name, rc2); } - if (ulsm != NULL) { - eco = echo_allocate_object (obd); - if (eco == NULL) - return (-ENOMEM); + case 3: + echo_site_fini(env, ed); + case 2: + cl_device_fini(&ed->ed_cl); + case 1: + OBD_FREE_PTR(ed); + case 0: + default: + break; + } + return(ERR_PTR(rc)); +} - lsm = eco->eco_lsm; +static int echo_device_init(const struct lu_env *env, struct lu_device *d, + const char *name, struct lu_device *next) +{ + LBUG(); + return 0; +} - rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob); - if (rc != 0) - goto failed; +static struct lu_device *echo_device_fini(const struct lu_env *env, + struct lu_device *d) +{ + struct echo_device *ed = cl2echo_dev(lu2cl_dev(d)); + struct lu_device *next = ed->ed_next; - /* setup object ID here for !on_target and LOV hint */ - if ((oa->o_valid & OBD_MD_FLID) != 0) - eco->eco_id = lsm->lsm_object_id = oa->o_id; + while (next) + next = next->ld_type->ldt_ops->ldto_device_fini(env, next); + return NULL; +} - if (lsm->lsm_stripe_count == 0) - lsm->lsm_stripe_count = ec->ec_nstripes; +static void echo_lock_release(const struct lu_env *env, + struct echo_lock *ecl, + int still_used) +{ + struct cl_lock *clk = echo_lock2cl(ecl); + + cl_lock_get(clk); + cl_unuse(env, clk); + cl_lock_release(env, clk, "ec enqueue", ecl->el_object); + if (!still_used) { + cl_lock_mutex_get(env, clk); + cl_lock_cancel(env, clk); + cl_lock_delete(env, clk); + cl_lock_mutex_put(env, clk); + } + cl_lock_put(env, clk); +} - if (lsm->lsm_stripe_size == 0) - lsm->lsm_stripe_size = PAGE_SIZE; +static struct lu_device *echo_device_free(const struct lu_env *env, + struct lu_device *d) +{ + struct echo_device *ed = cl2echo_dev(lu2cl_dev(d)); + struct echo_client_obd *ec = ed->ed_ec; + struct echo_object *eco; + struct lu_device *next = ed->ed_next; - idx = ll_insecure_random_int(); + CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n", + ed, next); - /* setup stripes: indices + default ids if required */ - for (i = 0; i < lsm->lsm_stripe_count; i++) { - if (lsm->lsm_oinfo[i].loi_id == 0) - lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id; + LASSERT(ed->ed_site); + lu_site_purge(env, &ed->ed_site->cs_lu, -1); - lsm->lsm_oinfo[i].loi_ost_idx = - (idx + i) % ec->ec_nstripes; + /* check if there are objects still alive. + * It shouldn't have any object because lu_site_purge would cleanup + * all of cached objects. Anyway, probably the echo device is being + * parallelly accessed. + */ + cfs_spin_lock(&ec->ec_lock); + cfs_list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain) + eco->eo_deleted = 1; + cfs_spin_unlock(&ec->ec_lock); + + /* purge again */ + lu_site_purge(env, &ed->ed_site->cs_lu, -1); + + CDEBUG(D_INFO, + "Waiting for the reference of echo object to be dropped\n"); + + /* Wait for the last reference to be dropped. */ + cfs_spin_lock(&ec->ec_lock); + while (!cfs_list_empty(&ec->ec_objects)) { + cfs_spin_unlock(&ec->ec_lock); + CERROR("echo_client still has objects at cleanup time, " + "wait for 1 second\n"); + cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT, + cfs_time_seconds(1)); + cfs_spin_lock(&ec->ec_lock); + } + cfs_spin_unlock(&ec->ec_lock); + + LASSERT(cfs_list_empty(&ec->ec_locks)); + + CDEBUG(D_INFO, "No object exists, exiting...\n"); + + echo_client_cleanup(d->ld_obd); + + while (next) + next = next->ld_type->ldt_ops->ldto_device_free(env, next); + + LASSERT(ed->ed_site == lu2cl_site(d->ld_site)); + echo_site_fini(env, ed); + cl_device_fini(&ed->ed_cl); + OBD_FREE_PTR(ed); + + return NULL; +} + +static const struct lu_device_type_operations echo_device_type_ops = { + .ldto_init = echo_type_init, + .ldto_fini = echo_type_fini, + + .ldto_start = echo_type_start, + .ldto_stop = echo_type_stop, + + .ldto_device_alloc = echo_device_alloc, + .ldto_device_free = echo_device_free, + .ldto_device_init = echo_device_init, + .ldto_device_fini = echo_device_fini +}; + +static struct lu_device_type echo_device_type = { + .ldt_tags = LU_DEVICE_CL, + .ldt_name = LUSTRE_ECHO_CLIENT_NAME, + .ldt_ops = &echo_device_type_ops, + .ldt_ctx_tags = LCT_CL_THREAD +}; +/** @} echo_init */ + +/** \defgroup echo_exports Exported operations + * + * exporting functions to echo client + * + * @{ + */ + +/* Interfaces to echo client obd device */ +static struct echo_object *cl_echo_object_find(struct echo_device *d, + struct lov_stripe_md **lsmp) +{ + struct lu_env *env; + struct echo_thread_info *info; + struct echo_object_conf *conf; + struct lov_stripe_md *lsm; + struct echo_object *eco; + struct cl_object *obj; + struct lu_fid *fid; + int refcheck; + ENTRY; + + LASSERT(lsmp); + lsm = *lsmp; + LASSERT(lsm); + LASSERT(lsm->lsm_object_id); + + /* Never return an object if the obd is to be freed. */ + if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping) + RETURN(ERR_PTR(-ENODEV)); + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN((void *)env); + + info = echo_env_info(env); + conf = &info->eti_conf; + if (d->ed_next) { + if (!d->ed_next_islov) { + struct lov_oinfo *oinfo = lsm->lsm_oinfo[0]; + LASSERT(oinfo != NULL); + oinfo->loi_id = lsm->lsm_object_id; + oinfo->loi_seq = lsm->lsm_object_seq; + conf->eoc_cl.u.coc_oinfo = oinfo; + } else { + struct lustre_md *md; + md = &info->eti_md; + memset(md, 0, sizeof *md); + md->lsm = lsm; + conf->eoc_cl.u.coc_md = md; } - } else { - OBD_ALLOC(eco, sizeof(*eco)); - eco->eco_device = obd; - lsm = NULL; } + conf->eoc_md = lsmp; - if (oa->o_id == 0) - oa->o_id = ++last_object_id; + fid = &info->eti_fid; + lsm2fid(lsm, fid); - if (on_target) { - /* XXX get some filter group constants */ - oa->o_gr = 2; - oa->o_valid |= OBD_MD_FLGROUP; - rc = obd_create(ec->ec_exp, oa, &lsm, oti); - if (rc != 0) - goto failed; + obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl); + if (IS_ERR(obj)) + GOTO(out, eco = (void*)obj); + + eco = cl2echo_obj(obj); + if (eco->eo_deleted) { + cl_object_put(env, obj); + eco = ERR_PTR(-EAGAIN); + } - /* See what object ID we were given */ - eco->eco_id = oa->o_id = lsm->lsm_object_id; - oa->o_valid |= OBD_MD_FLID; +out: + cl_env_put(env, &refcheck); + RETURN(eco); +} - LASSERT(eco->eco_lsm == NULL || eco->eco_lsm == lsm); - eco->eco_lsm = lsm; +static int cl_echo_object_put(struct echo_object *eco) +{ + struct lu_env *env; + struct cl_object *obj = echo_obj2cl(eco); + int refcheck; + ENTRY; + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + /* an external function to kill an object? */ + if (eco->eo_deleted) { + struct lu_object_header *loh = obj->co_lu.lo_header; + LASSERT(&eco->eo_hdr == luh2coh(loh)); + cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags); } - spin_lock (&ec->ec_lock); + cl_object_put(env, obj); + cl_env_put(env, &refcheck); + RETURN(0); +} + +static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco, + obd_off start, obd_off end, int mode, + __u64 *cookie , __u32 enqflags) +{ + struct cl_io *io; + struct cl_lock *lck; + struct cl_object *obj; + struct cl_lock_descr *descr; + struct echo_thread_info *info; + int rc = -ENOMEM; + ENTRY; - eco2 = echo_find_object_locked (obd, oa->o_id); - if (eco2 != NULL) { /* conflict */ - spin_unlock (&ec->ec_lock); + info = echo_env_info(env); + io = &info->eti_io; + descr = &info->eti_descr; + obj = echo_obj2cl(eco); - CERROR ("Can't create object id "LPX64": id already exists%s\n", - oa->o_id, on_target ? " (undoing create)" : ""); + descr->cld_obj = obj; + descr->cld_start = cl_index(obj, start); + descr->cld_end = cl_index(obj, end); + descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ; + descr->cld_enq_flags = enqflags; + io->ci_obj = obj; - if (on_target) - obd_destroy(ec->ec_exp, oa, lsm, oti); + lck = cl_lock_request(env, io, descr, "ec enqueue", eco); + if (lck) { + struct echo_client_obd *ec = eco->eo_dev->ed_ec; + struct echo_lock *el; - rc = -EEXIST; - goto failed; + rc = cl_wait(env, lck); + if (rc == 0) { + el = cl2echo_lock(cl_lock_at(lck, &echo_device_type)); + cfs_spin_lock(&ec->ec_lock); + if (cfs_list_empty(&el->el_chain)) { + cfs_list_add(&el->el_chain, &ec->ec_locks); + el->el_cookie = ++ec->ec_unique; + } + cfs_atomic_inc(&el->el_refcount); + *cookie = el->el_cookie; + cfs_spin_unlock(&ec->ec_lock); + } else + cl_lock_release(env, lck, "ec enqueue", cfs_current()); } + RETURN(rc); +} - list_add (&eco->eco_obj_chain, &ec->ec_objects); - spin_unlock (&ec->ec_lock); - CDEBUG (D_INFO, - "created %p: "LPX64"=%u#%u@%u refs %d del %d\n", - eco, eco->eco_id, - eco->eco_lsm->lsm_stripe_size, - eco->eco_lsm->lsm_stripe_count, - eco->eco_lsm->lsm_oinfo[0].loi_ost_idx, - eco->eco_refcount, eco->eco_deleted); - return (0); +static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end, + int mode, __u64 *cookie) +{ + struct echo_thread_info *info; + struct lu_env *env; + struct cl_io *io; + int refcheck; + int result; + ENTRY; - failed: - echo_free_object (eco); - return (rc); + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + info = echo_env_info(env); + io = &info->eti_io; + + result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco)); + if (result < 0) + GOTO(out, result); + LASSERT(result == 0); + + result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0); + cl_io_fini(env, io); + + EXIT; +out: + cl_env_put(env, &refcheck); + return result; } -static int -echo_get_object (struct ec_object **ecop, struct obd_device *obd, - struct obdo *oa) +static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed, + __u64 cookie) { - struct echo_client_obd *ec = &obd->u.echo_client; - struct ec_object *eco; - struct ec_object *eco2; - int rc; + struct echo_client_obd *ec = ed->ed_ec; + struct echo_lock *ecl = NULL; + cfs_list_t *el; + int found = 0, still_used = 0; + ENTRY; - if ((oa->o_valid & OBD_MD_FLID) == 0 || - oa->o_id == 0) /* disallow use of object id 0 */ - { - CERROR ("No valid oid\n"); - return (-EINVAL); + LASSERT(ec != NULL); + cfs_spin_lock (&ec->ec_lock); + cfs_list_for_each (el, &ec->ec_locks) { + ecl = cfs_list_entry (el, struct echo_lock, el_chain); + CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie); + found = (ecl->el_cookie == cookie); + if (found) { + if (cfs_atomic_dec_and_test(&ecl->el_refcount)) + cfs_list_del_init(&ecl->el_chain); + else + still_used = 1; + break; + } } + cfs_spin_unlock (&ec->ec_lock); - spin_lock (&ec->ec_lock); - eco = echo_find_object_locked (obd, oa->o_id); - if (eco != NULL) { - if (eco->eco_deleted) /* being deleted */ - return (-EAGAIN); /* (see comment in cleanup) */ + if (!found) + RETURN(-ENOENT); - eco->eco_refcount++; - spin_unlock (&ec->ec_lock); - *ecop = eco; - CDEBUG (D_INFO, - "found %p: "LPX64"=%u#%u@%u refs %d del %d\n", - eco, eco->eco_id, - eco->eco_lsm->lsm_stripe_size, - eco->eco_lsm->lsm_stripe_count, - eco->eco_lsm->lsm_oinfo[0].loi_ost_idx, - eco->eco_refcount, eco->eco_deleted); - return (0); + echo_lock_release(env, ecl, still_used); + RETURN(0); +} + +static int cl_echo_cancel(struct echo_device *ed, __u64 cookie) +{ + struct lu_env *env; + int refcheck; + int rc; + ENTRY; + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + rc = cl_echo_cancel0(env, ed, cookie); + + cl_env_put(env, &refcheck); + RETURN(rc); +} + +static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io, + enum cl_req_type unused, struct cl_2queue *queue) +{ + struct cl_page *clp; + struct cl_page *temp; + int result = 0; + ENTRY; + + cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) { + int rc; + rc = cl_page_cache_add(env, io, clp, CRT_WRITE); + if (rc == 0) + continue; + result = result ?: rc; } - spin_unlock (&ec->ec_lock); + RETURN(result); +} + +static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, + cfs_page_t **pages, int npages, int async) +{ + struct lu_env *env; + struct echo_thread_info *info; + struct cl_object *obj = echo_obj2cl(eco); + struct echo_device *ed = eco->eo_dev; + struct cl_2queue *queue; + struct cl_io *io; + struct cl_page *clp; + struct lustre_handle lh = { 0 }; + int page_size = cl_page_size(obj); + int refcheck; + int rc; + int i; + ENTRY; - if (ec->ec_nstripes != 0) /* striping required */ - return (-ENOENT); + LASSERT((offset & ~CFS_PAGE_MASK) == 0); + LASSERT(ed->ed_next != NULL); + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); - eco = echo_allocate_object (obd); - if (eco == NULL) - return (-ENOMEM); + info = echo_env_info(env); + io = &info->eti_io; + queue = &info->eti_queue; - eco->eco_id = eco->eco_lsm->lsm_object_id = oa->o_id; + cl_2queue_init(queue); + rc = cl_io_init(env, io, CIT_MISC, obj); + if (rc < 0) + GOTO(out, rc); + LASSERT(rc == 0); - spin_lock (&ec->ec_lock); - eco2 = echo_find_object_locked (obd, oa->o_id); - if (eco2 == NULL) { /* didn't race */ - list_add (&eco->eco_obj_chain, &ec->ec_objects); - spin_unlock (&ec->ec_lock); - eco->eco_refcount = 1; - *ecop = eco; - CDEBUG (D_INFO, - "created %p: "LPX64"=%u#%u@%d refs %d del %d\n", - eco, eco->eco_id, - eco->eco_lsm->lsm_stripe_size, - eco->eco_lsm->lsm_stripe_count, - eco->eco_lsm->lsm_oinfo[0].loi_ost_idx, - eco->eco_refcount, eco->eco_deleted); - return (0); + rc = cl_echo_enqueue0(env, eco, offset, + offset + npages * CFS_PAGE_SIZE - 1, + rw == READ ? LCK_PR : LCK_PW, &lh.cookie, + CEF_NEVER); + if (rc < 0) + GOTO(error_lock, rc); + + for (i = 0; i < npages; i++) { + LASSERT(pages[i]); + clp = cl_page_find(env, obj, cl_index(obj, offset), + pages[i], CPT_TRANSIENT); + if (IS_ERR(clp)) { + rc = PTR_ERR(clp); + break; + } + LASSERT(clp->cp_type == CPT_TRANSIENT); + + rc = cl_page_own(env, io, clp); + if (rc) { + LASSERT(clp->cp_state == CPS_FREEING); + cl_page_put(env, clp); + break; + } + + cl_2queue_add(queue, clp); + + /* drop the reference count for cl_page_find, so that the page + * will be freed in cl_2queue_fini. */ + cl_page_put(env, clp); + cl_page_clip(env, clp, 0, page_size); + + offset += page_size; } - if (eco2->eco_deleted) - rc = -EAGAIN; /* lose race */ - else { - eco2->eco_refcount++; /* take existing */ - *ecop = eco2; - rc = 0; - LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id); - CDEBUG (D_INFO, - "found(2) %p: "LPX64"=%u#%u@%d refs %d del %d\n", - eco2, eco2->eco_id, - eco2->eco_lsm->lsm_stripe_size, - eco2->eco_lsm->lsm_stripe_count, - eco2->eco_lsm->lsm_oinfo[0].loi_ost_idx, - eco2->eco_refcount, eco2->eco_deleted); + if (rc == 0) { + enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE; + + async = async && (typ == CRT_WRITE); + if (async) + rc = cl_echo_async_brw(env, io, typ, queue); + else + rc = cl_io_submit_sync(env, io, typ, queue, + CRP_NORMAL, 0); + CDEBUG(D_INFO, "echo_client %s write returns %d\n", + async ? "async" : "sync", rc); } - spin_unlock (&ec->ec_lock); + cl_echo_cancel0(env, ed, lh.cookie); + EXIT; +error_lock: + cl_2queue_discard(env, io, queue); + cl_2queue_disown(env, io, queue); + cl_2queue_fini(env, queue); + cl_io_fini(env, io); +out: + cl_env_put(env, &refcheck); + return rc; +} +/** @} echo_exports */ - echo_free_object (eco); - return (rc); + +static obd_id last_object_id; + +static int +echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob) +{ + struct lov_stripe_md *ulsm = _ulsm; + int nob, i; + + nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]); + if (nob > ulsm_nob) + return (-EINVAL); + + if (cfs_copy_to_user (ulsm, lsm, sizeof(ulsm))) + return (-EFAULT); + + for (i = 0; i < lsm->lsm_stripe_count; i++) { + if (cfs_copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i], + sizeof(lsm->lsm_oinfo[0]))) + return (-EFAULT); + } + return 0; } -static void -echo_put_object (struct ec_object *eco) +static int +echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm, + void *ulsm, int ulsm_nob) { - struct obd_device *obd = eco->eco_device; - struct echo_client_obd *ec = &obd->u.echo_client; + struct echo_client_obd *ec = ed->ed_ec; + int i; + + if (ulsm_nob < sizeof (*lsm)) + return (-EINVAL); + + if (cfs_copy_from_user (lsm, ulsm, sizeof (*lsm))) + return (-EFAULT); + + if (lsm->lsm_stripe_count > ec->ec_nstripes || + lsm->lsm_magic != LOV_MAGIC || + (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 || + ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL)) + return (-EINVAL); + + + for (i = 0; i < lsm->lsm_stripe_count; i++) { + if (cfs_copy_from_user(lsm->lsm_oinfo[i], + ((struct lov_stripe_md *)ulsm)-> \ + lsm_oinfo[i], + sizeof(lsm->lsm_oinfo[0]))) + return (-EFAULT); + } + return (0); +} + +static int echo_create_object(struct echo_device *ed, int on_target, + struct obdo *oa, void *ulsm, int ulsm_nob, + struct obd_trans_info *oti) +{ + struct echo_object *eco; + struct echo_client_obd *ec = ed->ed_ec; + struct lov_stripe_md *lsm = NULL; + int rc; + int created = 0; + ENTRY; + + if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */ + (on_target || /* set_stripe */ + ec->ec_nstripes != 0)) { /* LOV */ + CERROR ("No valid oid\n"); + RETURN(-EINVAL); + } + + rc = obd_alloc_memmd(ec->ec_exp, &lsm); + if (rc < 0) { + CERROR("Cannot allocate md, rc = %d\n", rc); + GOTO(failed, rc); + } + + if (ulsm != NULL) { + int i, idx; + + rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob); + if (rc != 0) + GOTO(failed, rc); + + if (lsm->lsm_stripe_count == 0) + lsm->lsm_stripe_count = ec->ec_nstripes; + + if (lsm->lsm_stripe_size == 0) + lsm->lsm_stripe_size = CFS_PAGE_SIZE; + + idx = cfs_rand(); + + /* setup stripes: indices + default ids if required */ + for (i = 0; i < lsm->lsm_stripe_count; i++) { + if (lsm->lsm_oinfo[i]->loi_id == 0) + lsm->lsm_oinfo[i]->loi_id = lsm->lsm_object_id; + + lsm->lsm_oinfo[i]->loi_ost_idx = + (idx + i) % ec->ec_nstripes; + } + } + + /* setup object ID here for !on_target and LOV hint */ + if (oa->o_valid & OBD_MD_FLID) + lsm->lsm_object_id = oa->o_id; + + if (lsm->lsm_object_id == 0) + lsm->lsm_object_id = ++last_object_id; + + rc = 0; + if (on_target) { + /* Only echo objects are allowed to be created */ + LASSERT((oa->o_valid & OBD_MD_FLGROUP) && + (oa->o_seq == FID_SEQ_ECHO)); + rc = obd_create(ec->ec_exp, oa, &lsm, oti); + if (rc != 0) { + CERROR("Cannot create objects, rc = %d\n", rc); + GOTO(failed, rc); + } + created = 1; + } + + /* See what object ID we were given */ + oa->o_id = lsm->lsm_object_id; + oa->o_valid |= OBD_MD_FLID; - /* Release caller's ref on the object. - * delete => mark for deletion when last ref goes - */ + eco = cl_echo_object_find(ed, &lsm); + if (IS_ERR(eco)) + GOTO(failed, rc = PTR_ERR(eco)); + cl_echo_object_put(eco); - spin_lock (&ec->ec_lock); + CDEBUG(D_INFO, "oa->o_id = %lx\n", (long)oa->o_id); + EXIT; - eco->eco_refcount--; - LASSERT (eco->eco_refcount >= 0); + failed: + if (created && rc) + obd_destroy(ec->ec_exp, oa, lsm, oti, NULL, NULL); + if (lsm) + obd_free_memmd(ec->ec_exp, &lsm); + if (rc) + CERROR("create object failed with rc = %d\n", rc); + return (rc); +} - CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u@%d refs %d del %d\n", - eco, eco->eco_id, - eco->eco_lsm->lsm_stripe_size, - eco->eco_lsm->lsm_stripe_count, - eco->eco_lsm->lsm_oinfo[0].loi_ost_idx, - eco->eco_refcount, eco->eco_deleted); +static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, + struct obdo *oa) +{ + struct echo_client_obd *ec = ed->ed_ec; + struct lov_stripe_md *lsm = NULL; + struct echo_object *eco; + int rc; + ENTRY; - if (eco->eco_refcount != 0 || !eco->eco_deleted) { - spin_unlock (&ec->ec_lock); - return; + if ((oa->o_valid & OBD_MD_FLID) == 0 || + oa->o_id == 0) /* disallow use of object id 0 */ + { + CERROR ("No valid oid\n"); + RETURN(-EINVAL); } - spin_unlock (&ec->ec_lock); - - /* NB leave obj in the object list. We must prevent anyone from - * attempting to enqueue on this object number until we can be - * sure there will be no more lock callbacks. - */ - obd_cancel_unused(ec->ec_exp, eco->eco_lsm, 0, NULL); + rc = obd_alloc_memmd(ec->ec_exp, &lsm); + if (rc < 0) + RETURN(rc); - /* now we can let it go */ - spin_lock (&ec->ec_lock); - list_del (&eco->eco_obj_chain); - spin_unlock (&ec->ec_lock); + lsm->lsm_object_id = oa->o_id; + if (oa->o_valid & OBD_MD_FLGROUP) + lsm->lsm_object_seq = oa->o_seq; + else + lsm->lsm_object_seq = FID_SEQ_ECHO; - LASSERT (eco->eco_refcount == 0); + rc = 0; + eco = cl_echo_object_find(ed, &lsm); + if (!IS_ERR(eco)) + *ecop = eco; + else + rc = PTR_ERR(eco); + if (lsm) + obd_free_memmd(ec->ec_exp, &lsm); + RETURN(rc); +} - echo_free_object (eco); +static void echo_put_object(struct echo_object *eco) +{ + if (cl_echo_object_put(eco)) + CERROR("echo client: drop an object failed"); } static void @@ -421,13 +1440,13 @@ echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp) stripe_index = woffset / stripe_size; - *idp = lsm->lsm_oinfo[stripe_index].loi_id; + *idp = lsm->lsm_oinfo[stripe_index]->loi_id; *offp = offset * stripe_size + woffset % stripe_size; } -static void -echo_client_page_debug_setup(struct lov_stripe_md *lsm, - struct page *page, int rw, obd_id id, +static void +echo_client_page_debug_setup(struct lov_stripe_md *lsm, + cfs_page_t *page, int rw, obd_id id, obd_off offset, obd_off count) { char *addr; @@ -436,11 +1455,11 @@ echo_client_page_debug_setup(struct lov_stripe_md *lsm, int delta; /* no partial pages on the client */ - LASSERT(count == PAGE_SIZE); + LASSERT(count == CFS_PAGE_SIZE); - addr = kmap(page); + addr = cfs_kmap(page); - for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { + for (delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { if (rw == OBD_BRW_WRITE) { stripe_off = offset + delta; stripe_id = id; @@ -449,17 +1468,16 @@ echo_client_page_debug_setup(struct lov_stripe_md *lsm, stripe_off = 0xdeadbeef00c0ffeeULL; stripe_id = 0xdeadbeef00c0ffeeULL; } - block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE, + block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE, stripe_off, stripe_id); } - kunmap(page); + cfs_kunmap(page); } -static int -echo_client_page_debug_check(struct lov_stripe_md *lsm, - struct page *page, obd_id id, - obd_off offset, obd_off count) +static int echo_client_page_debug_check(struct lov_stripe_md *lsm, + cfs_page_t *page, obd_id id, + obd_off offset, obd_off count) { obd_off stripe_off; obd_id stripe_id; @@ -469,82 +1487,105 @@ echo_client_page_debug_check(struct lov_stripe_md *lsm, int rc2; /* no partial pages on the client */ - LASSERT(count == PAGE_SIZE); + LASSERT(count == CFS_PAGE_SIZE); - addr = kmap(page); + addr = cfs_kmap(page); - for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { + for (rc = delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { stripe_off = offset + delta; stripe_id = id; echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id); - rc2 = block_debug_check("test_brw", - addr + delta, OBD_ECHO_BLOCK_SIZE, + rc2 = block_debug_check("test_brw", + addr + delta, OBD_ECHO_BLOCK_SIZE, stripe_off, stripe_id); - if (rc2 != 0) + if (rc2 != 0) { + CERROR ("Error in echo object "LPX64"\n", id); rc = rc2; + } } - kunmap(page); + cfs_kunmap(page); return rc; } -static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa, - struct lov_stripe_md *lsm, obd_off offset, - obd_size count, struct obd_trans_info *oti) +static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, + struct echo_object *eco, obd_off offset, + obd_size count, int async, + struct obd_trans_info *oti) { - struct echo_client_obd *ec = &obd->u.echo_client; + struct echo_client_obd *ec = ed->ed_ec; + struct lov_stripe_md *lsm = eco->eo_lsm; obd_count npages; struct brw_page *pga; struct brw_page *pgp; + cfs_page_t **pages; obd_off off; int i; int rc; - int verify = 0; + int verify; int gfp_mask; + int brw_flags = 0; + ENTRY; - /* oa_id == ECHO_PERSISTENT_OBJID => speed test (no verification). - * oa & 1 => use HIGHMEM */ + verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID && + (oa->o_valid & OBD_MD_FLFLAGS) != 0 && + (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0); - verify = (oa->o_id) != ECHO_PERSISTENT_OBJID; - gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER; + gfp_mask = ((oa->o_id & 2) == 0) ? CFS_ALLOC_STD : CFS_ALLOC_HIGHUSER; LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ); + LASSERT(lsm != NULL); + LASSERT(lsm->lsm_object_id == oa->o_id); if (count <= 0 || - (count & (PAGE_SIZE - 1)) != 0 || - (lsm != NULL && - lsm->lsm_object_id != oa->o_id)) - return (-EINVAL); + (count & (~CFS_PAGE_MASK)) != 0) + RETURN(-EINVAL); /* XXX think again with misaligned I/O */ - npages = count >> PAGE_SHIFT; + npages = count >> CFS_PAGE_SHIFT; + + if (rw == OBD_BRW_WRITE) + brw_flags = OBD_BRW_ASYNC; OBD_ALLOC(pga, npages * sizeof(*pga)); if (pga == NULL) - return (-ENOMEM); + RETURN(-ENOMEM); + + OBD_ALLOC(pages, npages * sizeof(*pages)); + if (pages == NULL) { + OBD_FREE(pga, npages * sizeof(*pga)); + RETURN(-ENOMEM); + } for (i = 0, pgp = pga, off = offset; i < npages; - i++, pgp++, off += PAGE_SIZE) { + i++, pgp++, off += CFS_PAGE_SIZE) { LASSERT (pgp->pg == NULL); /* for cleanup */ rc = -ENOMEM; - pgp->pg = alloc_pages (gfp_mask, 0); + OBD_PAGE_ALLOC(pgp->pg, gfp_mask); if (pgp->pg == NULL) goto out; - pgp->count = PAGE_SIZE; + pages[i] = pgp->pg; + pgp->count = CFS_PAGE_SIZE; pgp->off = off; - pgp->flag = 0; + pgp->flag = brw_flags; if (verify) - echo_client_page_debug_setup(lsm, pgp->pg, rw, + echo_client_page_debug_setup(lsm, pgp->pg, rw, oa->o_id, off, pgp->count); } - rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti); + if (ed->ed_next == NULL) { + struct obd_info oinfo = { { { 0 } } }; + oinfo.oi_oa = oa; + oinfo.oi_md = lsm; + rc = obd_brw(rw, ec->ec_exp, &oinfo, npages, pga, oti); + } else + rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async); out: if (rc != 0 || rw != OBD_BRW_READ) @@ -561,348 +1602,19 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa, if (vrc != 0 && rc == 0) rc = vrc; } - __free_pages(pgp->pg, 0); - } - OBD_FREE(pga, npages * sizeof(*pga)); - return (rc); -} - -#ifdef __KERNEL__ -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) -static int echo_client_ubrw(struct obd_device *obd, int rw, - struct obdo *oa, struct lov_stripe_md *lsm, - obd_off offset, obd_size count, char *buffer, - struct obd_trans_info *oti) -{ - struct echo_client_obd *ec = &obd->u.echo_client; - obd_count npages; - struct brw_page *pga; - struct brw_page *pgp; - obd_off off; - struct kiobuf *kiobuf; - int i; - int rc; - - LASSERT (rw == OBD_BRW_WRITE || - rw == OBD_BRW_READ); - - /* NB: for now, only whole pages, page aligned */ - - if (count <= 0 || - ((long)buffer & (PAGE_SIZE - 1)) != 0 || - (count & (PAGE_SIZE - 1)) != 0 || - (lsm != NULL && lsm->lsm_object_id != oa->o_id)) - return (-EINVAL); - - /* XXX think again with misaligned I/O */ - npages = count >> PAGE_SHIFT; - - OBD_ALLOC(pga, npages * sizeof(*pga)); - if (pga == NULL) - return (-ENOMEM); - - rc = alloc_kiovec (1, &kiobuf); - if (rc != 0) - goto out_1; - - rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE, - kiobuf, (unsigned long)buffer, count); - if (rc != 0) - goto out_2; - - LASSERT (kiobuf->offset == 0); - LASSERT (kiobuf->nr_pages == npages); - - for (i = 0, off = offset, pgp = pga; - i < npages; - i++, off += PAGE_SIZE, pgp++) { - pgp->off = off; - pgp->pg = kiobuf->maplist[i]; - pgp->count = PAGE_SIZE; - pgp->flag = 0; + OBD_PAGE_FREE(pgp->pg); } - - rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti); - - // if (rw == OBD_BRW_READ) - // mark_dirty_kiobuf (kiobuf, count); - - unmap_kiobuf (kiobuf); - out_2: - free_kiovec (1, &kiobuf); - out_1: OBD_FREE(pga, npages * sizeof(*pga)); - return (rc); -} -#else -static int echo_client_ubrw(struct obd_device *obd, int rw, - struct obdo *oa, struct lov_stripe_md *lsm, - obd_off offset, obd_size count, char *buffer, - struct obd_trans_info *oti) -{ -#warning "echo_client_ubrw() needs to be ported on 2.6 yet" - LBUG(); - return 0; -} -#endif -#endif - -struct echo_async_state; - -#define EAP_MAGIC 79277927 -struct echo_async_page { - int eap_magic; - struct page *eap_page; - void *eap_cookie; - obd_off eap_off; - struct echo_async_state *eap_eas; - struct list_head eap_item; -}; - -struct echo_async_state { - spinlock_t eas_lock; - obd_off eas_next_offset; - obd_off eas_end_offset; - int eas_in_flight; - int eas_rc; - wait_queue_head_t eas_waitq; - struct list_head eas_avail; - struct obdo eas_oa; - struct lov_stripe_md *eas_lsm; -}; - -static int eas_should_wake(struct echo_async_state *eas) -{ - unsigned long flags; - int rc = 0; - spin_lock_irqsave(&eas->eas_lock, flags); - if (eas->eas_rc == 0 && !list_empty(&eas->eas_avail)) - rc = 1; - spin_unlock_irqrestore(&eas->eas_lock, flags); - return rc; -}; - -struct echo_async_page *eap_from_cookie(void *cookie) -{ - struct echo_async_page *eap = cookie; - if (eap->eap_magic != EAP_MAGIC) - return ERR_PTR(-EINVAL); - return eap; -}; - -static int ec_ap_make_ready(void *data, int cmd) -{ - /* our pages are issued ready */ - LBUG(); - return 0; -} -static int ec_ap_refresh_count(void *data, int cmd) -{ - /* our pages are issued with a stable count */ - LBUG(); - return PAGE_SIZE; -} -static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa) -{ - struct echo_async_page *eap; - eap = eap_from_cookie(data); - if (IS_ERR(eap)) - return; - - memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa)); -} - -static void ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc) -{ - struct echo_async_page *eap = eap_from_cookie(data); - struct echo_async_state *eas; - unsigned long flags; - - if (IS_ERR(eap)) - return; - eas = eap->eap_eas; - - if (cmd == OBD_BRW_READ && - eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID) - echo_client_page_debug_check(eas->eas_lsm, eap->eap_page, - eas->eas_oa.o_id, eap->eap_off, - PAGE_SIZE); - - spin_lock_irqsave(&eas->eas_lock, flags); - if (rc && !eas->eas_rc) - eas->eas_rc = rc; - eas->eas_in_flight--; - list_add(&eap->eap_item, &eas->eas_avail); - wake_up(&eas->eas_waitq); - spin_unlock_irqrestore(&eas->eas_lock, flags); -} - -static struct obd_async_page_ops ec_async_page_ops = { - .ap_make_ready = ec_ap_make_ready, - .ap_refresh_count = ec_ap_refresh_count, - .ap_fill_obdo = ec_ap_fill_obdo, - .ap_completion = ec_ap_completion, -}; - -static int echo_client_async_page(struct obd_export *exp, int rw, - struct obdo *oa, struct lov_stripe_md *lsm, - obd_off offset, obd_size count, - obd_size batching) -{ - obd_count npages, i; - struct echo_async_page *eap; - struct echo_async_state eas; - struct list_head *pos, *n; - int rc = 0; - unsigned long flags; - LIST_HEAD(pages); -#if 0 - int verify; - int gfp_mask; - /* oa_id == 0 => speed test (no verification) else... - * oa & 1 => use HIGHMEM - */ - verify = (oa->o_id != 0); - gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER; -#endif - - LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ); - - if (count <= 0 || - (count & (PAGE_SIZE - 1)) != 0 || - (lsm != NULL && - lsm->lsm_object_id != oa->o_id)) - return (-EINVAL); - - /* XXX think again with misaligned I/O */ - npages = batching >> PAGE_SHIFT; - - memcpy(&eas.eas_oa, oa, sizeof(*oa)); - eas.eas_next_offset = offset; - eas.eas_end_offset = offset + count; - spin_lock_init(&eas.eas_lock); - init_waitqueue_head(&eas.eas_waitq); - eas.eas_in_flight = 0; - eas.eas_rc = 0; - eas.eas_lsm = lsm; - INIT_LIST_HEAD(&eas.eas_avail); - - /* prepare the group of pages that we're going to be keeping - * in flight */ - for (i = 0; i < npages; i++) { - struct page *page = alloc_page(GFP_KERNEL); - if (page == NULL) - GOTO(out, rc = -ENOMEM); - - page->private = 0; - list_add_tail(&PAGE_LIST(page), &pages); - - OBD_ALLOC(eap, sizeof(*eap)); - if (eap == NULL) - GOTO(out, rc = -ENOMEM); - - eap->eap_magic = EAP_MAGIC; - eap->eap_page = page; - eap->eap_eas = &eas; - page->private = (unsigned long)eap; - list_add_tail(&eap->eap_item, &eas.eas_avail); - } - - /* first we spin queueing io and being woken by its completion */ - spin_lock_irqsave(&eas.eas_lock, flags); - for(;;) { - int rc; - - /* sleep until we have a page to send */ - spin_unlock_irqrestore(&eas.eas_lock, flags); - rc = wait_event_interruptible(eas.eas_waitq, - eas_should_wake(&eas)); - spin_lock_irqsave(&eas.eas_lock, flags); - if (rc && !eas.eas_rc) - eas.eas_rc = rc; - if (eas.eas_rc) - break; - if (list_empty(&eas.eas_avail)) - continue; - eap = list_entry(eas.eas_avail.next, struct echo_async_page, - eap_item); - list_del(&eap->eap_item); - spin_unlock_irqrestore(&eas.eas_lock, flags); - - /* unbind the eap from its old page offset */ - if (eap->eap_cookie != NULL) { - obd_teardown_async_page(exp, lsm, NULL, - eap->eap_cookie); - eap->eap_cookie = NULL; - } - - eas.eas_next_offset += PAGE_SIZE; - eap->eap_off = eas.eas_next_offset; - - rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page, - eap->eap_off, &ec_async_page_ops, - eap, &eap->eap_cookie); - if (rc) { - spin_lock_irqsave(&eas.eas_lock, flags); - eas.eas_rc = rc; - break; - } - - if (oa->o_id != ECHO_PERSISTENT_OBJID) - echo_client_page_debug_setup(lsm, eap->eap_page, rw, - oa->o_id, - eap->eap_off, PAGE_SIZE); - - /* always asserts urgent, which isn't quite right */ - rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie, - rw, 0, PAGE_SIZE, 0, - ASYNC_READY | ASYNC_URGENT | - ASYNC_COUNT_STABLE); - spin_lock_irqsave(&eas.eas_lock, flags); - if (rc && !eas.eas_rc) { - eas.eas_rc = rc; - break; - } - eas.eas_in_flight++; - if (eas.eas_next_offset == eas.eas_end_offset) - break; - } - - /* still hold the eas_lock here.. */ - - /* now we just spin waiting for all the rpcs to complete */ - while(eas.eas_in_flight) { - spin_unlock_irqrestore(&eas.eas_lock, flags); - wait_event_interruptible(eas.eas_waitq, - eas.eas_in_flight == 0); - spin_lock_irqsave(&eas.eas_lock, flags); - } - spin_unlock_irqrestore(&eas.eas_lock, flags); - -out: - list_for_each_safe(pos, n, &pages) { - struct page *page = list_entry(pos, struct page, - PAGE_LIST_ENTRY); - - list_del(&PAGE_LIST(page)); - if (page->private != 0) { - eap = (struct echo_async_page *)page->private; - if (eap->eap_cookie != NULL) - obd_teardown_async_page(exp, lsm, NULL, - eap->eap_cookie); - OBD_FREE(eap, sizeof(*eap)); - } - __free_page(page); - } - + OBD_FREE(pages, npages * sizeof(*pages)); RETURN(rc); } static int echo_client_prep_commit(struct obd_export *exp, int rw, - struct obdo *oa, struct lov_stripe_md *lsm, - obd_off offset, obd_size count, + struct obdo *oa, struct echo_object *eco, + obd_off offset, obd_size count, obd_size batch, struct obd_trans_info *oti) { + struct lov_stripe_md *lsm = eco->eo_lsm; struct obd_ioobj ioo; struct niobuf_local *lnb; struct niobuf_remote *rnb; @@ -911,12 +1623,12 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, int i, ret = 0; ENTRY; - if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 || + if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 || (lsm != NULL && lsm->lsm_object_id != oa->o_id)) RETURN(-EINVAL); - npages = batch >> PAGE_SHIFT; - tot_pages = count >> PAGE_SHIFT; + npages = batch >> CFS_PAGE_SHIFT; + tot_pages = count >> CFS_PAGE_SHIFT; OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local)); OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote)); @@ -929,30 +1641,36 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, off = offset; for(; tot_pages; tot_pages -= npages) { + int lpages; + if (tot_pages < npages) npages = tot_pages; - for (i = 0; i < npages; i++, off += PAGE_SIZE) { + for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) { rnb[i].offset = off; - rnb[i].len = PAGE_SIZE; + rnb[i].len = CFS_PAGE_SIZE; } - /* XXX this can't be the best.. */ - memset(oti, 0, sizeof(*oti)); ioo.ioo_bufcnt = npages; + oti->oti_transno = 0; - ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti); + lpages = npages; + ret = obd_preprw(rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti, + NULL); if (ret != 0) GOTO(out, ret); + LASSERT(lpages == npages); - for (i = 0; i < npages; i++) { - struct page *page = lnb[i].page; + for (i = 0; i < lpages; i++) { + cfs_page_t *page = lnb[i].page; /* read past eof? */ if (page == NULL && lnb[i].rc == 0) continue; - if (oa->o_id == ECHO_PERSISTENT_OBJID) + if (oa->o_id == ECHO_PERSISTENT_OBJID || + (oa->o_valid & OBD_MD_FLFLAGS) == 0 || + (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0) continue; if (rw == OBD_BRW_WRITE) @@ -967,9 +1685,12 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, rnb[i].len); } - ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret); + ret = obd_commitrw(rw, exp, oa, 1,&ioo,rnb,npages,lnb,oti,ret); if (ret != 0) GOTO(out, ret); + + /* Reset oti otherwise it would confuse ldiskfs. */ + memset(oti, 0, sizeof(*oti)); } out: @@ -980,49 +1701,39 @@ out: RETURN(ret); } -int echo_client_brw_ioctl(int rw, struct obd_export *exp, - struct obd_ioctl_data *data) +static int echo_client_brw_ioctl(int rw, struct obd_export *exp, + struct obd_ioctl_data *data) { struct obd_device *obd = class_exp2obd(exp); - struct echo_client_obd *ec = &obd->u.echo_client; - struct obd_trans_info dummy_oti; - struct ec_object *eco; + struct echo_device *ed = obd2echo_dev(obd); + struct echo_client_obd *ec = ed->ed_ec; + struct obd_trans_info dummy_oti = { 0 }; + struct obdo *oa = &data->ioc_obdo1; + struct echo_object *eco; int rc; + int async = 1; ENTRY; - rc = echo_get_object(&eco, obd, &data->ioc_obdo1); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + rc = echo_get_object(&eco, ed, oa); if (rc) RETURN(rc); - memset(&dummy_oti, 0, sizeof(dummy_oti)); - - data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE; - data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP; - data->ioc_obdo1.o_gr = 2; + oa->o_valid &= ~OBD_MD_FLHANDLE; switch((long)data->ioc_pbuf1) { case 1: - if (data->ioc_pbuf2 == NULL) { // NULL user data pointer - rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1, - eco->eco_lsm, data->ioc_offset, - data->ioc_count, &dummy_oti); - } else { -#ifdef __KERNEL__ - rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1, - eco->eco_lsm, data->ioc_offset, - data->ioc_count, data->ioc_pbuf2, - &dummy_oti); -#endif - } - break; + async = 0; + /* fall through */ case 2: - rc = echo_client_async_page(ec->ec_exp, rw, &data->ioc_obdo1, - eco->eco_lsm, data->ioc_offset, - data->ioc_count, data->ioc_plen1); + rc = echo_client_kbrw(ed, rw, oa, + eco, data->ioc_offset, + data->ioc_count, async, &dummy_oti); break; case 3: - rc = echo_client_prep_commit(ec->ec_exp, rw, &data->ioc_obdo1, - eco->eco_lsm, data->ioc_offset, + rc = echo_client_prep_commit(ec->ec_exp, rw, oa, + eco, data->ioc_offset, data->ioc_count, data->ioc_plen1, &dummy_oti); break; @@ -1034,222 +1745,140 @@ int echo_client_brw_ioctl(int rw, struct obd_export *exp, } static int -echo_ldlm_callback (struct ldlm_lock *lock, struct ldlm_lock_desc *new, - void *data, int flag) -{ - struct ec_object *eco = (struct ec_object *)data; - struct echo_client_obd *ec = &(eco->eco_device->u.echo_client); - struct lustre_handle lockh; - struct list_head *el; - int found = 0; - int rc; - - ldlm_lock2handle (lock, &lockh); - - /* #ifdef this out if we're not feeling paranoid */ - spin_lock (&ec->ec_lock); - list_for_each (el, &ec->ec_objects) { - found = (eco == list_entry(el, struct ec_object, - eco_obj_chain)); - if (found) - break; - } - spin_unlock (&ec->ec_lock); - LASSERT (found); - - switch (flag) { - case LDLM_CB_BLOCKING: - CDEBUG(D_INFO, "blocking callback on "LPX64", handle "LPX64"\n", - eco->eco_id, lockh.cookie); - rc = ldlm_cli_cancel (&lockh); - if (rc != ELDLM_OK) - CERROR ("ldlm_cli_cancel failed: %d\n", rc); - break; - - case LDLM_CB_CANCELING: - CDEBUG(D_INFO, "cancel callback on "LPX64", handle "LPX64"\n", - eco->eco_id, lockh.cookie); - break; - - default: - LBUG (); - } - - return (0); -} - -static int echo_client_enqueue(struct obd_export *exp, struct obdo *oa, int mode, obd_off offset, obd_size nob) { - struct obd_device *obd = exp->exp_obd; - struct echo_client_obd *ec = &obd->u.echo_client; - struct lustre_handle *ulh = obdo_handle (oa); - struct ec_object *eco; - struct ec_lock *ecl; - int flags; + struct echo_device *ed = obd2echo_dev(exp->exp_obd); + struct lustre_handle *ulh = &oa->o_handle; + struct echo_object *eco; + obd_off end; int rc; + ENTRY; - if (!(mode == LCK_PR || mode == LCK_PW)) - return -EINVAL; + if (ed->ed_next == NULL) + RETURN(-EOPNOTSUPP); - if ((offset & (PAGE_SIZE - 1)) != 0 || - (nob & (PAGE_SIZE - 1)) != 0) - return -EINVAL; + if (!(mode == LCK_PR || mode == LCK_PW)) + RETURN(-EINVAL); - rc = echo_get_object (&eco, obd, oa); - if (rc != 0) - return rc; + if ((offset & (~CFS_PAGE_MASK)) != 0 || + (nob & (~CFS_PAGE_MASK)) != 0) + RETURN(-EINVAL); - rc = -ENOMEM; - OBD_ALLOC (ecl, sizeof (*ecl)); - if (ecl == NULL) - goto failed_0; - - ecl->ecl_mode = mode; - ecl->ecl_object = eco; - ecl->ecl_policy.l_extent.start = offset; - ecl->ecl_policy.l_extent.end = - (nob == 0) ? ((obd_off) -1) : (offset + nob - 1); - - flags = 0; - rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT, - &ecl->ecl_policy, mode, &flags, echo_ldlm_callback, - ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb), - lustre_swab_ost_lvb, &ecl->ecl_lock_handle); + rc = echo_get_object (&eco, ed, oa); if (rc != 0) - goto failed_1; - - CDEBUG(D_INFO, "enqueue handle "LPX64"\n", ecl->ecl_lock_handle.cookie); - - /* NB ecl takes object ref from echo_get_object() above */ - spin_lock(&ec->ec_lock); - - list_add(&ecl->ecl_exp_chain, &exp->exp_ec_data.eced_locks); - ulh->cookie = ecl->ecl_cookie = ec->ec_unique++; - - spin_unlock(&ec->ec_lock); - - oa->o_valid |= OBD_MD_FLHANDLE; - return 0; + RETURN(rc); - failed_1: - OBD_FREE (ecl, sizeof (*ecl)); - failed_0: - echo_put_object (eco); - return (rc); + end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1); + rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie); + if (rc == 0) { + oa->o_valid |= OBD_MD_FLHANDLE; + CDEBUG(D_INFO, "Cookie is "LPX64"\n", ulh->cookie); + } + echo_put_object(eco); + RETURN(rc); } static int echo_client_cancel(struct obd_export *exp, struct obdo *oa) { - struct obd_device *obd = exp->exp_obd; - struct echo_client_obd *ec = &obd->u.echo_client; - struct lustre_handle *ulh = obdo_handle (oa); - struct ec_lock *ecl = NULL; - int found = 0; - struct list_head *el; - int rc; + struct echo_device *ed = obd2echo_dev(exp->exp_obd); + __u64 cookie = oa->o_handle.cookie; if ((oa->o_valid & OBD_MD_FLHANDLE) == 0) return -EINVAL; - spin_lock (&ec->ec_lock); - - list_for_each (el, &exp->exp_ec_data.eced_locks) { - ecl = list_entry (el, struct ec_lock, ecl_exp_chain); - found = (ecl->ecl_cookie == ulh->cookie); - if (found) { - list_del (&ecl->ecl_exp_chain); - break; - } - } - - spin_unlock (&ec->ec_lock); - - if (!found) - return (-ENOENT); - - rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm, ecl->ecl_mode, - &ecl->ecl_lock_handle); - - echo_put_object (ecl->ecl_object); - OBD_FREE (ecl, sizeof (*ecl)); - - return rc; + CDEBUG(D_INFO, "Cookie is "LPX64"\n", cookie); + return cl_echo_cancel(ed, cookie); } static int echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { - struct obd_device *obd; - struct echo_client_obd *ec; - struct ec_object *eco; + struct obd_device *obd = exp->exp_obd; + struct echo_device *ed = obd2echo_dev(obd); + struct echo_client_obd *ec = ed->ed_ec; + struct echo_object *eco; struct obd_ioctl_data *data = karg; struct obd_trans_info dummy_oti; struct oti_req_ack_lock *ack_lock; struct obdo *oa; + struct lu_fid fid; int rw = OBD_BRW_READ; int rc = 0; int i; ENTRY; +#ifndef HAVE_UNLOCKED_IOCTL + cfs_unlock_kernel(); +#endif + memset(&dummy_oti, 0, sizeof(dummy_oti)); - obd = exp->exp_obd; - ec = &obd->u.echo_client; + oa = &data->ioc_obdo1; + if (!(oa->o_valid & OBD_MD_FLGROUP)) { + oa->o_valid |= OBD_MD_FLGROUP; + oa->o_seq = FID_SEQ_ECHO; + } + + /* This FID is unpacked just for validation at this point */ + rc = fid_ostid_unpack(&fid, &oa->o_oi, 0); + if (rc < 0) + RETURN(rc); switch (cmd) { case OBD_IOC_CREATE: /* may create echo object */ - if (!capable (CAP_SYS_ADMIN)) + if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); - rc = echo_create_object (obd, 1, &data->ioc_obdo1, + rc = echo_create_object (ed, 1, oa, data->ioc_pbuf1, data->ioc_plen1, &dummy_oti); GOTO(out, rc); case OBD_IOC_DESTROY: - if (!capable (CAP_SYS_ADMIN)) + if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); - rc = echo_get_object (&eco, obd, &data->ioc_obdo1); + rc = echo_get_object (&eco, ed, oa); if (rc == 0) { - oa = &data->ioc_obdo1; - oa->o_gr = 2; - oa->o_valid |= OBD_MD_FLGROUP; - rc = obd_destroy(ec->ec_exp, oa, eco->eco_lsm, - &dummy_oti); + rc = obd_destroy(ec->ec_exp, oa, eco->eo_lsm, + &dummy_oti, NULL, NULL); if (rc == 0) - eco->eco_deleted = 1; + eco->eo_deleted = 1; echo_put_object(eco); } GOTO(out, rc); case OBD_IOC_GETATTR: - rc = echo_get_object (&eco, obd, &data->ioc_obdo1); + rc = echo_get_object (&eco, ed, oa); if (rc == 0) { - rc = obd_getattr(ec->ec_exp, &data->ioc_obdo1, - eco->eco_lsm); + struct obd_info oinfo = { { { 0 } } }; + oinfo.oi_md = eco->eo_lsm; + oinfo.oi_oa = oa; + rc = obd_getattr(ec->ec_exp, &oinfo); echo_put_object(eco); } GOTO(out, rc); case OBD_IOC_SETATTR: - if (!capable (CAP_SYS_ADMIN)) + if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); - rc = echo_get_object (&eco, obd, &data->ioc_obdo1); + rc = echo_get_object (&eco, ed, oa); if (rc == 0) { - rc = obd_setattr(ec->ec_exp, &data->ioc_obdo1, - eco->eco_lsm, NULL); + struct obd_info oinfo = { { { 0 } } }; + oinfo.oi_oa = oa; + oinfo.oi_md = eco->eo_lsm; + + rc = obd_setattr(ec->ec_exp, &oinfo, NULL); echo_put_object(eco); } GOTO(out, rc); case OBD_IOC_BRW_WRITE: - if (!capable (CAP_SYS_ADMIN)) + if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); rw = OBD_BRW_WRITE; @@ -1259,42 +1888,43 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, GOTO(out, rc); case ECHO_IOC_GET_STRIPE: - rc = echo_get_object(&eco, obd, &data->ioc_obdo1); + rc = echo_get_object(&eco, ed, oa); if (rc == 0) { - rc = echo_copyout_lsm(eco->eco_lsm, data->ioc_pbuf1, + rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1, data->ioc_plen1); echo_put_object(eco); } GOTO(out, rc); case ECHO_IOC_SET_STRIPE: - if (!capable (CAP_SYS_ADMIN)) + if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); if (data->ioc_pbuf1 == NULL) { /* unset */ - rc = echo_get_object(&eco, obd, &data->ioc_obdo1); + rc = echo_get_object(&eco, ed, oa); if (rc == 0) { - eco->eco_deleted = 1; + eco->eo_deleted = 1; echo_put_object(eco); } } else { - rc = echo_create_object(obd, 0, &data->ioc_obdo1, + rc = echo_create_object(ed, 0, oa, data->ioc_pbuf1, data->ioc_plen1, &dummy_oti); } GOTO (out, rc); case ECHO_IOC_ENQUEUE: - if (!capable (CAP_SYS_ADMIN)) + if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); - rc = echo_client_enqueue(exp, &data->ioc_obdo1, + rc = echo_client_enqueue(exp, oa, data->ioc_conn1, /* lock mode */ - data->ioc_offset, data->ioc_count);/*extent*/ + data->ioc_offset, + data->ioc_count);/*extent*/ GOTO (out, rc); case ECHO_IOC_CANCEL: - rc = echo_client_cancel(exp, &data->ioc_obdo1); + rc = echo_client_cancel(exp, oa); GOTO (out, rc); default: @@ -1306,119 +1936,137 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, out: /* XXX this should be in a helper also called by target_send_reply */ - for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4; + for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4; i++, ack_lock++) { if (!ack_lock->mode) break; ldlm_lock_decref(&ack_lock->lock, ack_lock->mode); } +#ifndef HAVE_UNLOCKED_IOCTL + cfs_lock_kernel(); +#endif + return rc; } -static int -echo_client_setup(struct obd_device *obddev, obd_count len, void *buf) +static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) { - struct lustre_cfg* lcfg = buf; struct echo_client_obd *ec = &obddev->u.echo_client; struct obd_device *tgt; - struct lustre_handle conn = {0, }; struct obd_uuid echo_uuid = { "ECHO_UUID" }; + struct obd_connect_data *ocd = NULL; int rc; ENTRY; - if (lcfg->lcfg_inllen1 < 1) { + if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) { CERROR("requires a TARGET OBD name\n"); RETURN(-EINVAL); } - tgt = class_name2obd(lcfg->lcfg_inlbuf1); + tgt = class_name2obd(lustre_cfg_string(lcfg, 1)); if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) { CERROR("device not attached or not set up (%s)\n", - lcfg->lcfg_inlbuf1); + lustre_cfg_string(lcfg, 1)); RETURN(-EINVAL); } - spin_lock_init (&ec->ec_lock); - INIT_LIST_HEAD (&ec->ec_objects); + cfs_spin_lock_init (&ec->ec_lock); + CFS_INIT_LIST_HEAD (&ec->ec_objects); + CFS_INIT_LIST_HEAD (&ec->ec_locks); ec->ec_unique = 0; + ec->ec_nstripes = 0; - rc = obd_connect(&conn, tgt, &echo_uuid); - if (rc) { - CERROR("fail to connect to device %s\n", lcfg->lcfg_inlbuf1); + OBD_ALLOC(ocd, sizeof(*ocd)); + if (ocd == NULL) { + CERROR("Can't alloc ocd connecting to %s\n", + lustre_cfg_string(lcfg, 1)); + return -ENOMEM; + } + + ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | + OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 | + OBD_CONNECT_64BITHASH; + ocd->ocd_version = LUSTRE_VERSION_CODE; + ocd->ocd_group = FID_SEQ_ECHO; + + rc = obd_connect(NULL, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL); + if (rc == 0) { + /* Turn off pinger because it connects to tgt obd directly. */ + cfs_spin_lock(&tgt->obd_dev_lock); + cfs_list_del_init(&ec->ec_exp->exp_obd_chain_timed); + cfs_spin_unlock(&tgt->obd_dev_lock); + } + + OBD_FREE(ocd, sizeof(*ocd)); + + if (rc != 0) { + CERROR("fail to connect to device %s\n", + lustre_cfg_string(lcfg, 1)); return (rc); } - ec->ec_exp = class_conn2export(&conn); RETURN(rc); } -static int echo_client_cleanup(struct obd_device *obddev, int flags) +static int echo_client_cleanup(struct obd_device *obddev) { - struct list_head *el; - struct ec_object *eco; struct echo_client_obd *ec = &obddev->u.echo_client; int rc; ENTRY; - if (!list_empty(&obddev->obd_exports)) { + if (!cfs_list_empty(&obddev->obd_exports)) { CERROR("still has clients!\n"); RETURN(-EBUSY); } - /* XXX assuming sole access */ - while (!list_empty(&ec->ec_objects)) { - el = ec->ec_objects.next; - eco = list_entry(el, struct ec_object, eco_obj_chain); - - LASSERT(eco->eco_refcount == 0); - eco->eco_refcount = 1; - eco->eco_deleted = 1; - echo_put_object(eco); - } - - rc = obd_disconnect(ec->ec_exp, 0); + LASSERT(cfs_atomic_read(&ec->ec_exp->exp_refcount) > 0); + rc = obd_disconnect(ec->ec_exp); if (rc != 0) CERROR("fail to disconnect device: %d\n", rc); RETURN(rc); } -static int echo_client_connect(struct lustre_handle *conn, - struct obd_device *src, struct obd_uuid *cluuid) +static int echo_client_connect(const struct lu_env *env, + struct obd_export **exp, + struct obd_device *src, struct obd_uuid *cluuid, + struct obd_connect_data *data, void *localdata) { - struct obd_export *exp; int rc; + struct lustre_handle conn = { 0 }; - rc = class_connect(conn, src, cluuid); + ENTRY; + rc = class_connect(&conn, src, cluuid); if (rc == 0) { - exp = class_conn2export(conn); - INIT_LIST_HEAD(&exp->exp_ec_data.eced_locks); - class_export_put(exp); + *exp = class_conn2export(&conn); } RETURN (rc); } -static int echo_client_disconnect(struct obd_export *exp, int flags) +static int echo_client_disconnect(struct obd_export *exp) { +#if 0 struct obd_device *obd; struct echo_client_obd *ec; struct ec_lock *ecl; +#endif int rc; ENTRY; if (exp == NULL) GOTO(out, rc = -EINVAL); +#if 0 obd = exp->exp_obd; ec = &obd->u.echo_client; /* no more contention on export's lock list */ - while (!list_empty (&exp->exp_ec_data.eced_locks)) { - ecl = list_entry (exp->exp_ec_data.eced_locks.next, - struct ec_lock, ecl_exp_chain); - list_del (&ecl->ecl_exp_chain); + while (!cfs_list_empty (&exp->exp_ec_data.eced_locks)) { + ecl = cfs_list_entry (exp->exp_ec_data.eced_locks.next, + struct ec_lock, ecl_exp_chain); + cfs_list_del (&ecl->ecl_exp_chain); rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm, ecl->ecl_mode, &ecl->ecl_lock_handle); @@ -1429,8 +2077,9 @@ static int echo_client_disconnect(struct obd_export *exp, int flags) echo_put_object (ecl->ecl_object); OBD_FREE (ecl, sizeof (*ecl)); } +#endif - rc = class_disconnect(exp, 0); + rc = class_disconnect(exp); GOTO(out, rc); out: return rc; @@ -1438,8 +2087,12 @@ static int echo_client_disconnect(struct obd_export *exp, int flags) static struct obd_ops echo_obd_ops = { .o_owner = THIS_MODULE, + +#if 0 .o_setup = echo_client_setup, .o_cleanup = echo_client_cleanup, +#endif + .o_iocontrol = echo_client_iocontrol, .o_connect = echo_client_connect, .o_disconnect = echo_client_disconnect @@ -1447,14 +2100,27 @@ static struct obd_ops echo_obd_ops = { int echo_client_init(void) { - struct lprocfs_static_vars lvars; + struct lprocfs_static_vars lvars = { 0 }; + int rc; + + lprocfs_echo_init_vars(&lvars); - lprocfs_init_vars(echo, &lvars); - return class_register_type(&echo_obd_ops, lvars.module_vars, - OBD_ECHO_CLIENT_DEVICENAME); + rc = lu_kmem_init(echo_caches); + if (rc == 0) + rc = class_register_type(&echo_obd_ops, NULL, + lvars.module_vars, + LUSTRE_ECHO_CLIENT_NAME, + &echo_device_type); + if (rc) + lu_kmem_fini(echo_caches); + + return rc; } void echo_client_exit(void) { - class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME); + class_unregister_type(LUSTRE_ECHO_CLIENT_NAME); + lu_kmem_fini(echo_caches); } + +/** @} echo_client */