4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include <libcfs/libcfs.h>
41 #include <obd_support.h>
42 #include <obd_class.h>
43 #include <lustre_debug.h>
44 #include <lprocfs_status.h>
45 #include <cl_object.h>
46 #include <lustre_fid.h>
47 #include <lustre_acl.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #ifdef HAVE_SERVER_SUPPORT
51 # include <md_object.h>
52 #endif /* HAVE_SERVER_SUPPORT */
54 #include "echo_internal.h"
56 /** \defgroup echo_client Echo Client
61 struct cl_device ed_cl;
62 struct echo_client_obd *ed_ec;
64 struct cl_site ed_site_myself;
65 struct cl_site *ed_site;
66 struct lu_device *ed_next;
68 struct lu_client_seq *ed_cl_seq;
69 #ifdef HAVE_SERVER_SUPPORT
70 struct local_oid_storage *ed_los;
71 struct lu_fid ed_root_fid;
72 #endif /* HAVE_SERVER_SUPPORT */
76 struct cl_object eo_cl;
77 struct cl_object_header eo_hdr;
78 struct echo_device *eo_dev;
79 struct list_head eo_obj_chain;
80 struct lov_oinfo *eo_oinfo;
85 struct echo_object_conf {
86 struct cl_object_conf eoc_cl;
87 struct lov_oinfo **eoc_oinfo;
91 struct cl_page_slice ep_cl;
96 struct cl_lock_slice el_cl;
97 struct list_head el_chain;
98 struct echo_object *el_object;
100 atomic_t el_refcount;
103 #ifdef HAVE_SERVER_SUPPORT
104 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
107 * In order to use the values of members in struct mdd_device,
108 * we define an alias structure here.
110 struct echo_md_device {
111 struct md_device emd_md_dev;
112 struct obd_export *emd_child_exp;
113 struct dt_device *emd_child;
114 struct dt_device *emd_bottom;
115 struct lu_fid emd_root_fid;
116 struct lu_fid emd_local_root_fid;
118 #endif /* HAVE_SERVER_SUPPORT */
120 static int echo_client_setup(const struct lu_env *env,
121 struct obd_device *obddev,
122 struct lustre_cfg *lcfg);
123 static int echo_client_cleanup(struct obd_device *obddev);
126 /** \defgroup echo_helpers Helper functions
129 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
131 return container_of0(dev, struct echo_device, ed_cl);
134 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
139 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
141 return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
144 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
149 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
151 return container_of(o, struct echo_object, eo_cl);
154 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
156 return container_of(s, struct echo_page, ep_cl);
159 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
161 return container_of(s, struct echo_lock, el_cl);
164 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
166 return ecl->el_cl.cls_lock;
169 static struct lu_context_key echo_thread_key;
170 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
172 struct echo_thread_info *info;
173 info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
174 LASSERT(info != NULL);
179 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
181 return container_of(c, struct echo_object_conf, eoc_cl);
184 #ifdef HAVE_SERVER_SUPPORT
185 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
187 return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
190 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
192 return &d->emd_md_dev.md_lu_dev;
195 static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
197 return emd2lu_dev(d)->ld_site->ld_seq_site;
200 static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
202 return d->emd_md_dev.md_lu_dev.ld_obd;
204 #endif /* HAVE_SERVER_SUPPORT */
206 /** @} echo_helpers */
208 static int cl_echo_object_put(struct echo_object *eco);
209 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
210 struct page **pages, int npages, int async);
212 struct echo_thread_info {
213 struct echo_object_conf eti_conf;
214 struct lustre_md eti_md;
216 struct cl_2queue eti_queue;
218 struct cl_lock eti_lock;
219 struct lu_fid eti_fid;
220 struct lu_fid eti_fid2;
221 #ifdef HAVE_SERVER_SUPPORT
222 struct md_op_spec eti_spec;
223 struct lov_mds_md_v3 eti_lmm;
224 struct lov_user_md_v3 eti_lum;
225 struct md_attr eti_ma;
226 struct lu_name eti_lname;
227 /* per-thread values, can be re-used */
231 struct lu_buf eti_buf;
232 char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
236 /* No session used right now */
237 struct echo_session_info {
241 static struct kmem_cache *echo_lock_kmem;
242 static struct kmem_cache *echo_object_kmem;
243 static struct kmem_cache *echo_thread_kmem;
244 static struct kmem_cache *echo_session_kmem;
245 /* static struct kmem_cache *echo_req_kmem; */
247 static struct lu_kmem_descr echo_caches[] = {
249 .ckd_cache = &echo_lock_kmem,
250 .ckd_name = "echo_lock_kmem",
251 .ckd_size = sizeof (struct echo_lock)
254 .ckd_cache = &echo_object_kmem,
255 .ckd_name = "echo_object_kmem",
256 .ckd_size = sizeof (struct echo_object)
259 .ckd_cache = &echo_thread_kmem,
260 .ckd_name = "echo_thread_kmem",
261 .ckd_size = sizeof (struct echo_thread_info)
264 .ckd_cache = &echo_session_kmem,
265 .ckd_name = "echo_session_kmem",
266 .ckd_size = sizeof (struct echo_session_info)
273 /** \defgroup echo_page Page operations
275 * Echo page operations.
279 static int echo_page_own(const struct lu_env *env,
280 const struct cl_page_slice *slice,
281 struct cl_io *io, int nonblock)
283 struct echo_page *ep = cl2echo_page(slice);
286 mutex_lock(&ep->ep_lock);
287 else if (!mutex_trylock(&ep->ep_lock))
292 static void echo_page_disown(const struct lu_env *env,
293 const struct cl_page_slice *slice,
296 struct echo_page *ep = cl2echo_page(slice);
298 LASSERT(mutex_is_locked(&ep->ep_lock));
299 mutex_unlock(&ep->ep_lock);
302 static void echo_page_discard(const struct lu_env *env,
303 const struct cl_page_slice *slice,
304 struct cl_io *unused)
306 cl_page_delete(env, slice->cpl_page);
309 static int echo_page_is_vmlocked(const struct lu_env *env,
310 const struct cl_page_slice *slice)
312 if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
317 static void echo_page_completion(const struct lu_env *env,
318 const struct cl_page_slice *slice,
321 LASSERT(slice->cpl_page->cp_sync_io != NULL);
324 static void echo_page_fini(const struct lu_env *env,
325 struct cl_page_slice *slice)
327 struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
330 atomic_dec(&eco->eo_npages);
331 page_cache_release(slice->cpl_page->cp_vmpage);
335 static int echo_page_prep(const struct lu_env *env,
336 const struct cl_page_slice *slice,
337 struct cl_io *unused)
342 static int echo_page_print(const struct lu_env *env,
343 const struct cl_page_slice *slice,
344 void *cookie, lu_printer_t printer)
346 struct echo_page *ep = cl2echo_page(slice);
348 (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
349 ep, mutex_is_locked(&ep->ep_lock),
350 slice->cpl_page->cp_vmpage);
354 static const struct cl_page_operations echo_page_ops = {
355 .cpo_own = echo_page_own,
356 .cpo_disown = echo_page_disown,
357 .cpo_discard = echo_page_discard,
358 .cpo_fini = echo_page_fini,
359 .cpo_print = echo_page_print,
360 .cpo_is_vmlocked = echo_page_is_vmlocked,
363 .cpo_prep = echo_page_prep,
364 .cpo_completion = echo_page_completion,
367 .cpo_prep = echo_page_prep,
368 .cpo_completion = echo_page_completion,
374 /** \defgroup echo_lock Locking
376 * echo lock operations
380 static void echo_lock_fini(const struct lu_env *env,
381 struct cl_lock_slice *slice)
383 struct echo_lock *ecl = cl2echo_lock(slice);
385 LASSERT(list_empty(&ecl->el_chain));
386 OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
389 static struct cl_lock_operations echo_lock_ops = {
390 .clo_fini = echo_lock_fini,
395 /** \defgroup echo_cl_ops cl_object operations
397 * operations for cl_object
401 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
402 struct cl_page *page, pgoff_t index)
404 struct echo_page *ep = cl_object_page_slice(obj, page);
405 struct echo_object *eco = cl2echo_obj(obj);
408 page_cache_get(page->cp_vmpage);
409 mutex_init(&ep->ep_lock);
410 cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
411 atomic_inc(&eco->eo_npages);
415 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
421 static int echo_lock_init(const struct lu_env *env,
422 struct cl_object *obj, struct cl_lock *lock,
423 const struct cl_io *unused)
425 struct echo_lock *el;
428 OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
430 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
431 el->el_object = cl2echo_obj(obj);
432 INIT_LIST_HEAD(&el->el_chain);
433 atomic_set(&el->el_refcount, 0);
435 RETURN(el == NULL ? -ENOMEM : 0);
438 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
439 const struct cl_object_conf *conf)
444 static const struct cl_object_operations echo_cl_obj_ops = {
445 .coo_page_init = echo_page_init,
446 .coo_lock_init = echo_lock_init,
447 .coo_io_init = echo_io_init,
448 .coo_conf_set = echo_conf_set
450 /** @} echo_cl_ops */
452 /** \defgroup echo_lu_ops lu_object operations
454 * operations for echo lu object.
458 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
459 const struct lu_object_conf *conf)
461 struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev));
462 struct echo_client_obd *ec = ed->ed_ec;
463 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
467 struct lu_object *below;
468 struct lu_device *under;
471 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
475 lu_object_add(obj, below);
478 if (!ed->ed_next_ismd) {
479 const struct cl_object_conf *cconf = lu2cl_conf(conf);
480 struct echo_object_conf *econf = cl2echo_conf(cconf);
482 LASSERT(econf->eoc_oinfo != NULL);
484 /* Transfer the oinfo pointer to eco that it won't be
486 eco->eo_oinfo = *econf->eoc_oinfo;
487 *econf->eoc_oinfo = NULL;
489 eco->eo_oinfo = NULL;
493 atomic_set(&eco->eo_npages, 0);
494 cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
496 spin_lock(&ec->ec_lock);
497 list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
498 spin_unlock(&ec->ec_lock);
503 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
505 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
506 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
509 LASSERT(atomic_read(&eco->eo_npages) == 0);
511 spin_lock(&ec->ec_lock);
512 list_del_init(&eco->eo_obj_chain);
513 spin_unlock(&ec->ec_lock);
516 lu_object_header_fini(obj->lo_header);
518 if (eco->eo_oinfo != NULL)
519 OBD_FREE_PTR(eco->eo_oinfo);
521 OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
525 static int echo_object_print(const struct lu_env *env, void *cookie,
526 lu_printer_t p, const struct lu_object *o)
528 struct echo_object *obj = cl2echo_obj(lu2cl(o));
530 return (*p)(env, cookie, "echoclient-object@%p", obj);
533 static const struct lu_object_operations echo_lu_obj_ops = {
534 .loo_object_init = echo_object_init,
535 .loo_object_delete = NULL,
536 .loo_object_release = NULL,
537 .loo_object_free = echo_object_free,
538 .loo_object_print = echo_object_print,
539 .loo_object_invariant = NULL
541 /** @} echo_lu_ops */
543 /** \defgroup echo_lu_dev_ops lu_device operations
545 * Operations for echo lu device.
549 static struct lu_object *echo_object_alloc(const struct lu_env *env,
550 const struct lu_object_header *hdr,
551 struct lu_device *dev)
553 struct echo_object *eco;
554 struct lu_object *obj = NULL;
557 /* we're the top dev. */
558 LASSERT(hdr == NULL);
559 OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
561 struct cl_object_header *hdr = &eco->eo_hdr;
563 obj = &echo_obj2cl(eco)->co_lu;
564 cl_object_header_init(hdr);
565 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
567 lu_object_init(obj, &hdr->coh_lu, dev);
568 lu_object_add_top(&hdr->coh_lu, obj);
570 eco->eo_cl.co_ops = &echo_cl_obj_ops;
571 obj->lo_ops = &echo_lu_obj_ops;
576 static struct lu_device_operations echo_device_lu_ops = {
577 .ldo_object_alloc = echo_object_alloc,
580 /** @} echo_lu_dev_ops */
582 static struct cl_device_operations echo_device_cl_ops = {
585 /** \defgroup echo_init Setup and teardown
587 * Init and fini functions for echo client.
591 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
593 struct cl_site *site = &ed->ed_site_myself;
596 /* initialize site */
597 rc = cl_site_init(site, &ed->ed_cl);
599 CERROR("Cannot initilize site for echo client(%d)\n", rc);
603 rc = lu_site_init_finish(&site->cs_lu);
611 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
614 if (!ed->ed_next_ismd)
615 cl_site_fini(ed->ed_site);
620 static void *echo_thread_key_init(const struct lu_context *ctx,
621 struct lu_context_key *key)
623 struct echo_thread_info *info;
625 OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
627 info = ERR_PTR(-ENOMEM);
631 static void echo_thread_key_fini(const struct lu_context *ctx,
632 struct lu_context_key *key, void *data)
634 struct echo_thread_info *info = data;
635 OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
638 static void echo_thread_key_exit(const struct lu_context *ctx,
639 struct lu_context_key *key, void *data)
643 static struct lu_context_key echo_thread_key = {
644 .lct_tags = LCT_CL_THREAD,
645 .lct_init = echo_thread_key_init,
646 .lct_fini = echo_thread_key_fini,
647 .lct_exit = echo_thread_key_exit
650 static void *echo_session_key_init(const struct lu_context *ctx,
651 struct lu_context_key *key)
653 struct echo_session_info *session;
655 OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
657 session = ERR_PTR(-ENOMEM);
661 static void echo_session_key_fini(const struct lu_context *ctx,
662 struct lu_context_key *key, void *data)
664 struct echo_session_info *session = data;
665 OBD_SLAB_FREE_PTR(session, echo_session_kmem);
668 static void echo_session_key_exit(const struct lu_context *ctx,
669 struct lu_context_key *key, void *data)
673 static struct lu_context_key echo_session_key = {
674 .lct_tags = LCT_SESSION,
675 .lct_init = echo_session_key_init,
676 .lct_fini = echo_session_key_fini,
677 .lct_exit = echo_session_key_exit
680 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
682 #ifdef HAVE_SERVER_SUPPORT
683 # define ECHO_SEQ_WIDTH 0xffffffff
684 static int echo_fid_init(struct echo_device *ed, char *obd_name,
685 struct seq_server_site *ss)
691 OBD_ALLOC_PTR(ed->ed_cl_seq);
692 if (ed->ed_cl_seq == NULL)
695 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
697 GOTO(out_free_seq, rc = -ENOMEM);
699 snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
701 /* Init client side sequence-manager */
702 rc = seq_client_init(ed->ed_cl_seq, NULL,
704 prefix, ss->ss_server_seq);
705 ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
706 OBD_FREE(prefix, MAX_OBD_NAME + 5);
708 GOTO(out_free_seq, rc);
713 OBD_FREE_PTR(ed->ed_cl_seq);
714 ed->ed_cl_seq = NULL;
718 static int echo_fid_fini(struct obd_device *obddev)
720 struct echo_device *ed = obd2echo_dev(obddev);
723 if (ed->ed_cl_seq != NULL) {
724 seq_client_fini(ed->ed_cl_seq);
725 OBD_FREE_PTR(ed->ed_cl_seq);
726 ed->ed_cl_seq = NULL;
732 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
736 if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
737 local_oid_storage_fini(env, ed->ed_los);
743 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
744 struct local_oid_storage *los,
745 const struct lu_fid *pfid, const char *name,
746 __u32 mode, struct lu_fid *fid)
748 struct dt_object *parent = NULL;
749 struct dt_object *dto = NULL;
753 LASSERT(!fid_is_zero(pfid));
754 parent = dt_locate(env, emd->emd_bottom, pfid);
755 if (unlikely(IS_ERR(parent)))
756 RETURN(PTR_ERR(parent));
758 /* create local file with @fid */
759 dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
762 GOTO(out_put, rc = PTR_ERR(dto));
764 *fid = *lu_object_fid(&dto->do_lu);
765 /* since stack is not fully set up the local_storage uses own stack
766 * and we should drop its object from cache */
767 lu_object_put_nocache(env, &dto->do_lu);
771 lu_object_put(env, &parent->do_lu);
776 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
777 struct echo_device *ed)
783 /* Setup local dirs */
784 fid.f_seq = FID_SEQ_LOCAL_NAME;
787 rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
791 lu_echo_root_fid(&fid);
792 if (echo_md_seq_site(emd)->ss_node_id == 0) {
793 rc = echo_md_local_file_create(env, emd, ed->ed_los,
794 &emd->emd_local_root_fid,
795 echo_md_root_dir_name, S_IFDIR |
796 S_IRUGO | S_IWUSR | S_IXUGO,
799 CERROR("%s: create md echo root fid failed: rc = %d\n",
800 emd2obd_dev(emd)->obd_name, rc);
804 ed->ed_root_fid = fid;
808 echo_ed_los_fini(env, ed);
812 #endif /* HAVE_SERVER_SUPPORT */
814 static struct lu_device *echo_device_alloc(const struct lu_env *env,
815 struct lu_device_type *t,
816 struct lustre_cfg *cfg)
818 struct lu_device *next;
819 struct echo_device *ed;
820 struct cl_device *cd;
821 struct obd_device *obd = NULL; /* to keep compiler happy */
822 struct obd_device *tgt;
823 const char *tgt_type_name;
830 GOTO(out, rc = -ENOMEM);
834 rc = cl_device_init(cd, t);
838 cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
839 cd->cd_ops = &echo_device_cl_ops;
842 obd = class_name2obd(lustre_cfg_string(cfg, 0));
843 LASSERT(obd != NULL);
844 LASSERT(env != NULL);
846 tgt = class_name2obd(lustre_cfg_string(cfg, 1));
848 CERROR("Can not find tgt device %s\n",
849 lustre_cfg_string(cfg, 1));
850 GOTO(out, rc = -ENODEV);
853 next = tgt->obd_lu_dev;
855 if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
856 ed->ed_next_ismd = 1;
857 } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
858 strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
859 ed->ed_next_ismd = 0;
860 rc = echo_site_init(env, ed);
864 GOTO(out, rc = -EINVAL);
869 rc = echo_client_setup(env, obd, cfg);
873 ed->ed_ec = &obd->u.echo_client;
876 if (ed->ed_next_ismd) {
877 #ifdef HAVE_SERVER_SUPPORT
878 /* Suppose to connect to some Metadata layer */
879 struct lu_site *ls = NULL;
880 struct lu_device *ld = NULL;
881 struct md_device *md = NULL;
882 struct echo_md_device *emd = NULL;
886 CERROR("%s is not lu device type!\n",
887 lustre_cfg_string(cfg, 1));
888 GOTO(out, rc = -EINVAL);
891 tgt_type_name = lustre_cfg_string(cfg, 2);
892 if (!tgt_type_name) {
893 CERROR("%s no type name for echo %s setup\n",
894 lustre_cfg_string(cfg, 1),
895 tgt->obd_type->typ_name);
896 GOTO(out, rc = -EINVAL);
901 spin_lock(&ls->ls_ld_lock);
902 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
903 if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
908 spin_unlock(&ls->ls_ld_lock);
911 CERROR("%s is not lu device type!\n",
912 lustre_cfg_string(cfg, 1));
913 GOTO(out, rc = -EINVAL);
917 /* For MD echo client, it will use the site in MDS stack */
918 ed->ed_site_myself.cs_lu = *ls;
919 ed->ed_site = &ed->ed_site_myself;
920 ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu;
921 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
923 CERROR("echo fid init error %d\n", rc);
927 md = lu2md_dev(next);
928 emd = lu2emd_dev(&md->md_lu_dev);
929 rc = echo_md_root_get(env, emd, ed);
931 CERROR("%s: get root error: rc = %d\n",
932 emd2obd_dev(emd)->obd_name, rc);
935 #else /* !HAVE_SERVER_SUPPORT */
936 CERROR("Local operations are NOT supported on client side. "
937 "Only remote operations are supported. Metadata client "
938 "must be run on server side.\n");
939 GOTO(out, rc = -EOPNOTSUPP);
940 #endif /* HAVE_SERVER_SUPPORT */
942 /* if echo client is to be stacked upon ost device, the next is
943 * NULL since ost is not a clio device so far */
944 if (next != NULL && !lu_device_is_cl(next))
947 tgt_type_name = tgt->obd_type->typ_name;
949 LASSERT(next != NULL);
950 if (next->ld_site != NULL)
951 GOTO(out, rc = -EBUSY);
953 next->ld_site = &ed->ed_site->cs_lu;
954 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
955 next->ld_type->ldt_name,
960 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
964 RETURN(&cd->cd_lu_dev);
969 rc2 = echo_client_cleanup(obd);
971 CERROR("Cleanup obd device %s error(%d)\n",
976 echo_site_fini(env, ed);
978 cl_device_fini(&ed->ed_cl);
988 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
989 const char *name, struct lu_device *next)
995 static struct lu_device *echo_device_fini(const struct lu_env *env,
998 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
999 struct lu_device *next = ed->ed_next;
1001 while (next && !ed->ed_next_ismd)
1002 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
1006 static void echo_lock_release(const struct lu_env *env,
1007 struct echo_lock *ecl,
1010 struct cl_lock *clk = echo_lock2cl(ecl);
1012 cl_lock_release(env, clk);
1015 static struct lu_device *echo_device_free(const struct lu_env *env,
1016 struct lu_device *d)
1018 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
1019 struct echo_client_obd *ec = ed->ed_ec;
1020 struct echo_object *eco;
1021 struct lu_device *next = ed->ed_next;
1023 CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
1026 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1028 /* check if there are objects still alive.
1029 * It shouldn't have any object because lu_site_purge would cleanup
1030 * all of cached objects. Anyway, probably the echo device is being
1031 * parallelly accessed.
1033 spin_lock(&ec->ec_lock);
1034 list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
1035 eco->eo_deleted = 1;
1036 spin_unlock(&ec->ec_lock);
1039 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1042 "Waiting for the reference of echo object to be dropped\n");
1044 /* Wait for the last reference to be dropped. */
1045 spin_lock(&ec->ec_lock);
1046 while (!list_empty(&ec->ec_objects)) {
1047 spin_unlock(&ec->ec_lock);
1048 CERROR("echo_client still has objects at cleanup time, "
1049 "wait for 1 second\n");
1050 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1051 cfs_time_seconds(1));
1052 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1053 spin_lock(&ec->ec_lock);
1055 spin_unlock(&ec->ec_lock);
1057 LASSERT(list_empty(&ec->ec_locks));
1059 CDEBUG(D_INFO, "No object exists, exiting...\n");
1061 echo_client_cleanup(d->ld_obd);
1062 #ifdef HAVE_SERVER_SUPPORT
1063 echo_fid_fini(d->ld_obd);
1064 echo_ed_los_fini(env, ed);
1066 while (next && !ed->ed_next_ismd)
1067 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1069 LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
1070 echo_site_fini(env, ed);
1071 cl_device_fini(&ed->ed_cl);
1077 static const struct lu_device_type_operations echo_device_type_ops = {
1078 .ldto_init = echo_type_init,
1079 .ldto_fini = echo_type_fini,
1081 .ldto_start = echo_type_start,
1082 .ldto_stop = echo_type_stop,
1084 .ldto_device_alloc = echo_device_alloc,
1085 .ldto_device_free = echo_device_free,
1086 .ldto_device_init = echo_device_init,
1087 .ldto_device_fini = echo_device_fini
1090 static struct lu_device_type echo_device_type = {
1091 .ldt_tags = LU_DEVICE_CL,
1092 .ldt_name = LUSTRE_ECHO_CLIENT_NAME,
1093 .ldt_ops = &echo_device_type_ops,
1094 .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1098 /** \defgroup echo_exports Exported operations
1100 * exporting functions to echo client
1105 /* Interfaces to echo client obd device */
1106 static struct echo_object *
1107 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
1110 struct echo_thread_info *info;
1111 struct echo_object_conf *conf;
1112 struct echo_object *eco;
1113 struct cl_object *obj;
1114 struct lov_oinfo *oinfo = NULL;
1120 LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
1121 LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
1123 /* Never return an object if the obd is to be freed. */
1124 if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1125 RETURN(ERR_PTR(-ENODEV));
1127 env = cl_env_get(&refcheck);
1129 RETURN((void *)env);
1131 info = echo_env_info(env);
1132 conf = &info->eti_conf;
1134 OBD_ALLOC_PTR(oinfo);
1136 GOTO(out, eco = ERR_PTR(-ENOMEM));
1138 oinfo->loi_oi = *oi;
1139 conf->eoc_cl.u.coc_oinfo = oinfo;
1142 /* If echo_object_init() is successful then ownership of oinfo
1143 * is transferred to the object. */
1144 conf->eoc_oinfo = &oinfo;
1146 fid = &info->eti_fid;
1147 rc = ostid_to_fid(fid, oi, 0);
1149 GOTO(out, eco = ERR_PTR(rc));
1151 /* In the function below, .hs_keycmp resolves to
1152 * lu_obj_hop_keycmp() */
1153 /* coverity[overrun-buffer-val] */
1154 obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1156 GOTO(out, eco = (void*)obj);
1158 eco = cl2echo_obj(obj);
1159 if (eco->eo_deleted) {
1160 cl_object_put(env, obj);
1161 eco = ERR_PTR(-EAGAIN);
1166 OBD_FREE_PTR(oinfo);
1168 cl_env_put(env, &refcheck);
1172 static int cl_echo_object_put(struct echo_object *eco)
1175 struct cl_object *obj = echo_obj2cl(eco);
1179 env = cl_env_get(&refcheck);
1181 RETURN(PTR_ERR(env));
1183 /* an external function to kill an object? */
1184 if (eco->eo_deleted) {
1185 struct lu_object_header *loh = obj->co_lu.lo_header;
1186 LASSERT(&eco->eo_hdr == luh2coh(loh));
1187 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1190 cl_object_put(env, obj);
1191 cl_env_put(env, &refcheck);
1195 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1196 u64 start, u64 end, int mode,
1197 __u64 *cookie , __u32 enqflags)
1200 struct cl_lock *lck;
1201 struct cl_object *obj;
1202 struct cl_lock_descr *descr;
1203 struct echo_thread_info *info;
1207 info = echo_env_info(env);
1209 lck = &info->eti_lock;
1210 obj = echo_obj2cl(eco);
1212 memset(lck, 0, sizeof(*lck));
1213 descr = &lck->cll_descr;
1214 descr->cld_obj = obj;
1215 descr->cld_start = cl_index(obj, start);
1216 descr->cld_end = cl_index(obj, end);
1217 descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1218 descr->cld_enq_flags = enqflags;
1221 rc = cl_lock_request(env, io, lck);
1223 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1224 struct echo_lock *el;
1226 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1227 spin_lock(&ec->ec_lock);
1228 if (list_empty(&el->el_chain)) {
1229 list_add(&el->el_chain, &ec->ec_locks);
1230 el->el_cookie = ++ec->ec_unique;
1232 atomic_inc(&el->el_refcount);
1233 *cookie = el->el_cookie;
1234 spin_unlock(&ec->ec_lock);
1239 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1242 struct echo_client_obd *ec = ed->ed_ec;
1243 struct echo_lock *ecl = NULL;
1244 struct list_head *el;
1245 int found = 0, still_used = 0;
1248 LASSERT(ec != NULL);
1249 spin_lock(&ec->ec_lock);
1250 list_for_each(el, &ec->ec_locks) {
1251 ecl = list_entry(el, struct echo_lock, el_chain);
1252 CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
1253 found = (ecl->el_cookie == cookie);
1255 if (atomic_dec_and_test(&ecl->el_refcount))
1256 list_del_init(&ecl->el_chain);
1262 spin_unlock(&ec->ec_lock);
1267 echo_lock_release(env, ecl, still_used);
1271 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1272 struct cl_page *page)
1274 struct echo_thread_info *info;
1275 struct cl_2queue *queue;
1277 info = echo_env_info(env);
1278 LASSERT(io == &info->eti_io);
1280 queue = &info->eti_queue;
1281 cl_page_list_add(&queue->c2_qout, page);
1284 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1285 struct page **pages, int npages, int async)
1288 struct echo_thread_info *info;
1289 struct cl_object *obj = echo_obj2cl(eco);
1290 struct echo_device *ed = eco->eo_dev;
1291 struct cl_2queue *queue;
1293 struct cl_page *clp;
1294 struct lustre_handle lh = { 0 };
1295 int page_size = cl_page_size(obj);
1301 LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1302 LASSERT(ed->ed_next != NULL);
1303 env = cl_env_get(&refcheck);
1305 RETURN(PTR_ERR(env));
1307 info = echo_env_info(env);
1309 queue = &info->eti_queue;
1311 cl_2queue_init(queue);
1313 io->ci_ignore_layout = 1;
1314 rc = cl_io_init(env, io, CIT_MISC, obj);
1320 rc = cl_echo_enqueue0(env, eco, offset,
1321 offset + npages * PAGE_CACHE_SIZE - 1,
1322 rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1325 GOTO(error_lock, rc);
1327 for (i = 0; i < npages; i++) {
1329 clp = cl_page_find(env, obj, cl_index(obj, offset),
1330 pages[i], CPT_TRANSIENT);
1335 LASSERT(clp->cp_type == CPT_TRANSIENT);
1337 rc = cl_page_own(env, io, clp);
1339 LASSERT(clp->cp_state == CPS_FREEING);
1340 cl_page_put(env, clp);
1344 cl_2queue_add(queue, clp);
1346 /* drop the reference count for cl_page_find, so that the page
1347 * will be freed in cl_2queue_fini. */
1348 cl_page_put(env, clp);
1349 cl_page_clip(env, clp, 0, page_size);
1351 offset += page_size;
1355 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1357 async = async && (typ == CRT_WRITE);
1359 rc = cl_io_commit_async(env, io, &queue->c2_qin,
1361 echo_commit_callback);
1363 rc = cl_io_submit_sync(env, io, typ, queue, 0);
1364 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1365 async ? "async" : "sync", rc);
1368 cl_echo_cancel0(env, ed, lh.cookie);
1371 cl_2queue_discard(env, io, queue);
1372 cl_2queue_disown(env, io, queue);
1373 cl_2queue_fini(env, queue);
1374 cl_io_fini(env, io);
1376 cl_env_put(env, &refcheck);
1379 /** @} echo_exports */
1382 static u64 last_object_id;
1384 #ifdef HAVE_SERVER_SUPPORT
1385 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1388 sprintf(name, LPU64, id);
1389 lname->ln_name = name;
1390 lname->ln_namelen = strlen(name);
1393 /* similar to mdt_attr_get_complex */
1394 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1397 struct echo_thread_info *info = echo_env_info(env);
1402 LASSERT(ma->ma_lmm_size > 0);
1404 rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1408 /* big_lmm may need to be grown */
1409 if (info->eti_big_lmmsize < rc) {
1410 int size = size_roundup_power2(rc);
1412 if (info->eti_big_lmmsize > 0) {
1413 /* free old buffer */
1414 LASSERT(info->eti_big_lmm);
1415 OBD_FREE_LARGE(info->eti_big_lmm,
1416 info->eti_big_lmmsize);
1417 info->eti_big_lmm = NULL;
1418 info->eti_big_lmmsize = 0;
1421 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1422 if (info->eti_big_lmm == NULL)
1424 info->eti_big_lmmsize = size;
1426 LASSERT(info->eti_big_lmmsize >= rc);
1428 info->eti_buf.lb_buf = info->eti_big_lmm;
1429 info->eti_buf.lb_len = info->eti_big_lmmsize;
1430 rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1434 ma->ma_valid |= MA_LOV;
1435 ma->ma_lmm = info->eti_big_lmm;
1436 ma->ma_lmm_size = rc;
1441 static int echo_attr_get_complex(const struct lu_env *env,
1442 struct md_object *next,
1445 struct echo_thread_info *info = echo_env_info(env);
1446 struct lu_buf *buf = &info->eti_buf;
1447 umode_t mode = lu_object_attr(&next->mo_lu);
1448 int need = ma->ma_need;
1455 if (need & MA_INODE) {
1456 ma->ma_need = MA_INODE;
1457 rc = mo_attr_get(env, next, ma);
1460 ma->ma_valid |= MA_INODE;
1463 if (need & MA_LOV) {
1464 if (S_ISREG(mode) || S_ISDIR(mode)) {
1465 LASSERT(ma->ma_lmm_size > 0);
1466 buf->lb_buf = ma->ma_lmm;
1467 buf->lb_len = ma->ma_lmm_size;
1468 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1470 ma->ma_lmm_size = rc2;
1471 ma->ma_valid |= MA_LOV;
1472 } else if (rc2 == -ENODATA) {
1474 ma->ma_lmm_size = 0;
1475 } else if (rc2 == -ERANGE) {
1476 rc2 = echo_big_lmm_get(env, next, ma);
1478 GOTO(out, rc = rc2);
1480 GOTO(out, rc = rc2);
1485 #ifdef CONFIG_FS_POSIX_ACL
1486 if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1487 buf->lb_buf = ma->ma_acl;
1488 buf->lb_len = ma->ma_acl_size;
1489 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1491 ma->ma_acl_size = rc2;
1492 ma->ma_valid |= MA_ACL_DEF;
1493 } else if (rc2 == -ENODATA) {
1495 ma->ma_acl_size = 0;
1497 GOTO(out, rc = rc2);
1503 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
1504 rc, ma->ma_valid, ma->ma_lmm);
1509 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1510 struct md_object *parent, struct lu_fid *fid,
1511 struct lu_name *lname, struct md_op_spec *spec,
1514 struct lu_object *ec_child, *child;
1515 struct lu_device *ld = ed->ed_next;
1516 struct echo_thread_info *info = echo_env_info(env);
1517 struct lu_fid *fid2 = &info->eti_fid2;
1518 struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
1523 rc = mdo_lookup(env, parent, lname, fid2, spec);
1526 else if (rc != -ENOENT)
1529 ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1531 if (IS_ERR(ec_child)) {
1532 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1534 RETURN(PTR_ERR(ec_child));
1537 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1538 if (child == NULL) {
1539 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1540 GOTO(out_put, rc = -EINVAL);
1543 CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1544 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1547 * Do not perform lookup sanity check. We know that name does not exist.
1549 spec->sp_cr_lookup = 0;
1550 rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1552 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1555 CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc = %d\n",
1556 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1559 lu_object_put(env, ec_child);
1563 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1566 struct echo_thread_info *info = echo_env_info(env);
1568 if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1569 ma->ma_lmm = (void *)&info->eti_lmm;
1570 ma->ma_lmm_size = sizeof(info->eti_lmm);
1572 LASSERT(info->eti_big_lmmsize);
1573 ma->ma_lmm = info->eti_big_lmm;
1574 ma->ma_lmm_size = info->eti_big_lmmsize;
1580 static int echo_create_md_object(const struct lu_env *env,
1581 struct echo_device *ed,
1582 struct lu_object *ec_parent,
1584 char *name, int namelen,
1585 __u64 id, __u32 mode, int count,
1586 int stripe_count, int stripe_offset)
1588 struct lu_object *parent;
1589 struct echo_thread_info *info = echo_env_info(env);
1590 struct lu_name *lname = &info->eti_lname;
1591 struct md_op_spec *spec = &info->eti_spec;
1592 struct md_attr *ma = &info->eti_ma;
1593 struct lu_device *ld = ed->ed_next;
1599 if (ec_parent == NULL)
1601 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1605 memset(ma, 0, sizeof(*ma));
1606 memset(spec, 0, sizeof(*spec));
1607 if (stripe_count != 0) {
1608 spec->sp_cr_flags |= FMODE_WRITE;
1609 echo_set_lmm_size(env, ld, ma);
1610 if (stripe_count != -1) {
1611 struct lov_user_md_v3 *lum = &info->eti_lum;
1613 lum->lmm_magic = LOV_USER_MAGIC_V3;
1614 lum->lmm_stripe_count = stripe_count;
1615 lum->lmm_stripe_offset = stripe_offset;
1616 lum->lmm_pattern = 0;
1617 spec->u.sp_ea.eadata = lum;
1618 spec->u.sp_ea.eadatalen = sizeof(*lum);
1619 spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1623 ma->ma_attr.la_mode = mode;
1624 ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1625 ma->ma_attr.la_ctime = cfs_time_current_64();
1628 lname->ln_name = name;
1629 lname->ln_namelen = namelen;
1630 /* If name is specified, only create one object by name */
1631 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1636 /* Create multiple object sequenced by id */
1637 for (i = 0; i < count; i++) {
1638 char *tmp_name = info->eti_name;
1640 echo_md_build_name(lname, tmp_name, id);
1642 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1645 CERROR("Can not create child %s: rc = %d\n", tmp_name,
1656 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1657 struct echo_device *ed,
1658 struct md_object *parent,
1659 struct lu_name *lname)
1661 struct echo_thread_info *info = echo_env_info(env);
1662 struct lu_fid *fid = &info->eti_fid;
1663 struct lu_object *child;
1667 CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1669 rc = mdo_lookup(env, parent, lname, fid, NULL);
1671 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1672 RETURN(ERR_PTR(rc));
1675 /* In the function below, .hs_keycmp resolves to
1676 * lu_obj_hop_keycmp() */
1677 /* coverity[overrun-buffer-val] */
1678 child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1683 static int echo_setattr_object(const struct lu_env *env,
1684 struct echo_device *ed,
1685 struct lu_object *ec_parent,
1686 __u64 id, int count)
1688 struct lu_object *parent;
1689 struct echo_thread_info *info = echo_env_info(env);
1690 struct lu_name *lname = &info->eti_lname;
1691 char *name = info->eti_name;
1692 struct lu_device *ld = ed->ed_next;
1693 struct lu_buf *buf = &info->eti_buf;
1699 if (ec_parent == NULL)
1701 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1705 for (i = 0; i < count; i++) {
1706 struct lu_object *ec_child, *child;
1708 echo_md_build_name(lname, name, id);
1710 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1711 if (IS_ERR(ec_child)) {
1712 CERROR("Can't find child %s: rc = %ld\n",
1713 lname->ln_name, PTR_ERR(ec_child));
1714 RETURN(PTR_ERR(ec_child));
1717 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1718 if (child == NULL) {
1719 CERROR("Can not locate the child %s\n", lname->ln_name);
1720 lu_object_put(env, ec_child);
1725 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1726 PFID(lu_object_fid(child)));
1728 buf->lb_buf = info->eti_xattr_buf;
1729 buf->lb_len = sizeof(info->eti_xattr_buf);
1731 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1732 rc = mo_xattr_set(env, lu2md(child), buf, name,
1735 CERROR("Can not setattr child "DFID": rc = %d\n",
1736 PFID(lu_object_fid(child)), rc);
1737 lu_object_put(env, ec_child);
1740 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1741 PFID(lu_object_fid(child)));
1743 lu_object_put(env, ec_child);
1748 static int echo_getattr_object(const struct lu_env *env,
1749 struct echo_device *ed,
1750 struct lu_object *ec_parent,
1751 __u64 id, int count)
1753 struct lu_object *parent;
1754 struct echo_thread_info *info = echo_env_info(env);
1755 struct lu_name *lname = &info->eti_lname;
1756 char *name = info->eti_name;
1757 struct md_attr *ma = &info->eti_ma;
1758 struct lu_device *ld = ed->ed_next;
1764 if (ec_parent == NULL)
1766 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1770 memset(ma, 0, sizeof(*ma));
1771 ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1772 ma->ma_acl = info->eti_xattr_buf;
1773 ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1775 for (i = 0; i < count; i++) {
1776 struct lu_object *ec_child, *child;
1779 echo_md_build_name(lname, name, id);
1780 echo_set_lmm_size(env, ld, ma);
1782 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1783 if (IS_ERR(ec_child)) {
1784 CERROR("Can't find child %s: rc = %ld\n",
1785 lname->ln_name, PTR_ERR(ec_child));
1786 RETURN(PTR_ERR(ec_child));
1789 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1790 if (child == NULL) {
1791 CERROR("Can not locate the child %s\n", lname->ln_name);
1792 lu_object_put(env, ec_child);
1796 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1797 PFID(lu_object_fid(child)));
1798 rc = echo_attr_get_complex(env, lu2md(child), ma);
1800 CERROR("Can not getattr child "DFID": rc = %d\n",
1801 PFID(lu_object_fid(child)), rc);
1802 lu_object_put(env, ec_child);
1805 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1806 PFID(lu_object_fid(child)));
1808 lu_object_put(env, ec_child);
1814 static int echo_lookup_object(const struct lu_env *env,
1815 struct echo_device *ed,
1816 struct lu_object *ec_parent,
1817 __u64 id, int count)
1819 struct lu_object *parent;
1820 struct echo_thread_info *info = echo_env_info(env);
1821 struct lu_name *lname = &info->eti_lname;
1822 char *name = info->eti_name;
1823 struct lu_fid *fid = &info->eti_fid;
1824 struct lu_device *ld = ed->ed_next;
1828 if (ec_parent == NULL)
1830 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1834 /*prepare the requests*/
1835 for (i = 0; i < count; i++) {
1836 echo_md_build_name(lname, name, id);
1838 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1839 PFID(lu_object_fid(parent)), lname->ln_name, parent);
1841 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1843 CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1846 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1847 PFID(lu_object_fid(parent)), lname->ln_name, parent);
1854 static int echo_md_destroy_internal(const struct lu_env *env,
1855 struct echo_device *ed,
1856 struct md_object *parent,
1857 struct lu_name *lname,
1860 struct lu_device *ld = ed->ed_next;
1861 struct lu_object *ec_child;
1862 struct lu_object *child;
1867 ec_child = echo_md_lookup(env, ed, parent, lname);
1868 if (IS_ERR(ec_child)) {
1869 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1871 RETURN(PTR_ERR(ec_child));
1874 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1875 if (child == NULL) {
1876 CERROR("Can not locate the child %s\n", lname->ln_name);
1877 GOTO(out_put, rc = -EINVAL);
1880 if (lu_object_remote(child)) {
1881 CERROR("Can not destroy remote object %s: rc = %d\n",
1882 lname->ln_name, -EPERM);
1883 GOTO(out_put, rc = -EPERM);
1885 CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1886 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1888 rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1890 CERROR("Can not unlink child %s: rc = %d\n",
1891 lname->ln_name, rc);
1894 CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1895 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1897 lu_object_put(env, ec_child);
1901 static int echo_destroy_object(const struct lu_env *env,
1902 struct echo_device *ed,
1903 struct lu_object *ec_parent,
1904 char *name, int namelen,
1905 __u64 id, __u32 mode,
1908 struct echo_thread_info *info = echo_env_info(env);
1909 struct lu_name *lname = &info->eti_lname;
1910 struct md_attr *ma = &info->eti_ma;
1911 struct lu_device *ld = ed->ed_next;
1912 struct lu_object *parent;
1917 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1921 memset(ma, 0, sizeof(*ma));
1922 ma->ma_attr.la_mode = mode;
1923 ma->ma_attr.la_valid = LA_CTIME;
1924 ma->ma_attr.la_ctime = cfs_time_current_64();
1925 ma->ma_need = MA_INODE;
1929 lname->ln_name = name;
1930 lname->ln_namelen = namelen;
1931 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1936 /*prepare the requests*/
1937 for (i = 0; i < count; i++) {
1938 char *tmp_name = info->eti_name;
1941 echo_md_build_name(lname, tmp_name, id);
1943 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1946 CERROR("Can not unlink child %s: rc = %d\n", name, rc);
1955 static struct lu_object *echo_resolve_path(const struct lu_env *env,
1956 struct echo_device *ed, char *path,
1959 struct lu_device *ld = ed->ed_next;
1960 struct echo_thread_info *info = echo_env_info(env);
1961 struct lu_fid *fid = &info->eti_fid;
1962 struct lu_name *lname = &info->eti_lname;
1963 struct lu_object *parent = NULL;
1964 struct lu_object *child = NULL;
1968 *fid = ed->ed_root_fid;
1970 /* In the function below, .hs_keycmp resolves to
1971 * lu_obj_hop_keycmp() */
1972 /* coverity[overrun-buffer-val] */
1973 parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1974 if (IS_ERR(parent)) {
1975 CERROR("Can not find the parent "DFID": rc = %ld\n",
1976 PFID(fid), PTR_ERR(parent));
1981 struct lu_object *ld_parent;
1984 e = strsep(&path, "/");
1989 if (!path || path[0] == '\0')
1995 lname->ln_namelen = strlen(e);
1997 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
1998 if (ld_parent == NULL) {
1999 lu_object_put(env, parent);
2004 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
2005 lu_object_put(env, parent);
2006 if (IS_ERR(child)) {
2007 rc = (int)PTR_ERR(child);
2008 CERROR("lookup %s under parent "DFID": rc = %d\n",
2009 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2016 RETURN(ERR_PTR(rc));
2021 static void echo_ucred_init(struct lu_env *env)
2023 struct lu_ucred *ucred = lu_ucred(env);
2025 ucred->uc_valid = UCRED_INVALID;
2027 ucred->uc_suppgids[0] = -1;
2028 ucred->uc_suppgids[1] = -1;
2030 ucred->uc_uid = ucred->uc_o_uid =
2031 from_kuid(&init_user_ns, current_uid());
2032 ucred->uc_gid = ucred->uc_o_gid =
2033 from_kgid(&init_user_ns, current_gid());
2034 ucred->uc_fsuid = ucred->uc_o_fsuid =
2035 from_kuid(&init_user_ns, current_fsuid());
2036 ucred->uc_fsgid = ucred->uc_o_fsgid =
2037 from_kgid(&init_user_ns, current_fsgid());
2038 ucred->uc_cap = cfs_curproc_cap_pack();
2040 /* remove fs privilege for non-root user. */
2041 if (ucred->uc_fsuid)
2042 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2043 ucred->uc_valid = UCRED_NEW;
2046 static void echo_ucred_fini(struct lu_env *env)
2048 struct lu_ucred *ucred = lu_ucred(env);
2049 ucred->uc_valid = UCRED_INIT;
2052 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
2053 #define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
2054 static int echo_md_handler(struct echo_device *ed, int command,
2055 char *path, int path_len, __u64 id, int count,
2056 struct obd_ioctl_data *data)
2058 struct echo_thread_info *info;
2059 struct lu_device *ld = ed->ed_next;
2062 struct lu_object *parent;
2064 int namelen = data->ioc_plen2;
2069 CERROR("MD echo client is not being initialized properly\n");
2073 if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2074 CERROR("Only support MDD layer right now!\n");
2078 env = cl_env_get(&refcheck);
2080 RETURN(PTR_ERR(env));
2082 rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
2086 /* init big_lmm buffer */
2087 info = echo_env_info(env);
2088 LASSERT(info->eti_big_lmm == NULL);
2089 OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2090 if (info->eti_big_lmm == NULL)
2091 GOTO(out_env, rc = -ENOMEM);
2092 info->eti_big_lmmsize = MIN_MD_SIZE;
2094 parent = echo_resolve_path(env, ed, path, path_len);
2095 if (IS_ERR(parent)) {
2096 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2098 GOTO(out_free, rc = PTR_ERR(parent));
2102 OBD_ALLOC(name, namelen + 1);
2104 GOTO(out_put, rc = -ENOMEM);
2105 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2106 GOTO(out_name, rc = -EFAULT);
2109 echo_ucred_init(env);
2112 case ECHO_MD_CREATE:
2113 case ECHO_MD_MKDIR: {
2114 struct echo_thread_info *info = echo_env_info(env);
2115 __u32 mode = data->ioc_obdo2.o_mode;
2116 struct lu_fid *fid = &info->eti_fid;
2117 int stripe_count = (int)data->ioc_obdo2.o_misc;
2118 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2120 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2124 /* In the function below, .hs_keycmp resolves to
2125 * lu_obj_hop_keycmp() */
2126 /* coverity[overrun-buffer-val] */
2127 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2128 id, mode, count, stripe_count,
2132 case ECHO_MD_DESTROY:
2133 case ECHO_MD_RMDIR: {
2134 __u32 mode = data->ioc_obdo2.o_mode;
2136 rc = echo_destroy_object(env, ed, parent, name, namelen,
2140 case ECHO_MD_LOOKUP:
2141 rc = echo_lookup_object(env, ed, parent, id, count);
2143 case ECHO_MD_GETATTR:
2144 rc = echo_getattr_object(env, ed, parent, id, count);
2146 case ECHO_MD_SETATTR:
2147 rc = echo_setattr_object(env, ed, parent, id, count);
2150 CERROR("unknown command %d\n", command);
2154 echo_ucred_fini(env);
2158 OBD_FREE(name, namelen + 1);
2160 lu_object_put(env, parent);
2162 LASSERT(info->eti_big_lmm);
2163 OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2164 info->eti_big_lmm = NULL;
2165 info->eti_big_lmmsize = 0;
2167 cl_env_put(env, &refcheck);
2170 #endif /* HAVE_SERVER_SUPPORT */
2172 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2173 struct obdo *oa, struct obd_trans_info *oti)
2175 struct echo_object *eco;
2176 struct echo_client_obd *ec = ed->ed_ec;
2181 if (!(oa->o_valid & OBD_MD_FLID) ||
2182 !(oa->o_valid & OBD_MD_FLGROUP) ||
2183 !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
2184 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2188 if (ostid_id(&oa->o_oi) == 0)
2189 ostid_set_id(&oa->o_oi, ++last_object_id);
2191 rc = obd_create(env, ec->ec_exp, oa, oti);
2193 CERROR("Cannot create objects: rc = %d\n", rc);
2199 oa->o_valid |= OBD_MD_FLID;
2201 eco = cl_echo_object_find(ed, &oa->o_oi);
2203 GOTO(failed, rc = PTR_ERR(eco));
2204 cl_echo_object_put(eco);
2206 CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2210 if (created && rc != 0)
2211 obd_destroy(env, ec->ec_exp, oa, oti);
2214 CERROR("create object failed with: rc = %d\n", rc);
2219 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2222 struct echo_object *eco;
2226 if (!(oa->o_valid & OBD_MD_FLID) ||
2227 !(oa->o_valid & OBD_MD_FLGROUP) ||
2228 ostid_id(&oa->o_oi) == 0) {
2229 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2234 eco = cl_echo_object_find(ed, &oa->o_oi);
2243 static void echo_put_object(struct echo_object *eco)
2247 rc = cl_echo_object_put(eco);
2249 CERROR("%s: echo client drop an object failed: rc = %d\n",
2250 eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2253 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2254 u64 offset, u64 count)
2261 /* no partial pages on the client */
2262 LASSERT(count == PAGE_CACHE_SIZE);
2266 for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2267 if (rw == OBD_BRW_WRITE) {
2268 stripe_off = offset + delta;
2271 stripe_off = 0xdeadbeef00c0ffeeULL;
2272 stripe_id = 0xdeadbeef00c0ffeeULL;
2274 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2275 stripe_off, stripe_id);
2282 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2291 /* no partial pages on the client */
2292 LASSERT(count == PAGE_CACHE_SIZE);
2296 for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2297 stripe_off = offset + delta;
2300 rc2 = block_debug_check("test_brw",
2301 addr + delta, OBD_ECHO_BLOCK_SIZE,
2302 stripe_off, stripe_id);
2304 CERROR ("Error in echo object "LPX64"\n", id);
2313 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2314 struct echo_object *eco, u64 offset,
2315 u64 count, int async,
2316 struct obd_trans_info *oti)
2319 struct brw_page *pga;
2320 struct brw_page *pgp;
2321 struct page **pages;
2330 verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2331 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2332 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2334 gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
2336 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2338 if ((count & (~CFS_PAGE_MASK)) != 0)
2341 /* XXX think again with misaligned I/O */
2342 npages = count >> PAGE_CACHE_SHIFT;
2344 if (rw == OBD_BRW_WRITE)
2345 brw_flags = OBD_BRW_ASYNC;
2347 OBD_ALLOC(pga, npages * sizeof(*pga));
2351 OBD_ALLOC(pages, npages * sizeof(*pages));
2352 if (pages == NULL) {
2353 OBD_FREE(pga, npages * sizeof(*pga));
2357 for (i = 0, pgp = pga, off = offset;
2359 i++, pgp++, off += PAGE_CACHE_SIZE) {
2361 LASSERT (pgp->pg == NULL); /* for cleanup */
2364 OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
2365 if (pgp->pg == NULL)
2369 pgp->count = PAGE_CACHE_SIZE;
2371 pgp->flag = brw_flags;
2374 echo_client_page_debug_setup(pgp->pg, rw,
2375 ostid_id(&oa->o_oi), off,
2379 /* brw mode can only be used at client */
2380 LASSERT(ed->ed_next != NULL);
2381 rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2384 if (rc != 0 || rw != OBD_BRW_READ)
2387 for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2388 if (pgp->pg == NULL)
2393 vrc = echo_client_page_debug_check(pgp->pg,
2394 ostid_id(&oa->o_oi),
2395 pgp->off, pgp->count);
2396 if (vrc != 0 && rc == 0)
2399 OBD_PAGE_FREE(pgp->pg);
2401 OBD_FREE(pga, npages * sizeof(*pga));
2402 OBD_FREE(pages, npages * sizeof(*pages));
2406 static int echo_client_prep_commit(const struct lu_env *env,
2407 struct obd_export *exp, int rw,
2408 struct obdo *oa, struct echo_object *eco,
2409 u64 offset, u64 count,
2410 u64 batch, struct obd_trans_info *oti,
2413 struct obd_ioobj ioo;
2414 struct niobuf_local *lnb;
2415 struct niobuf_remote *rnb;
2417 u64 npages, tot_pages;
2418 int i, ret = 0, brw_flags = 0;
2422 if (count <= 0 || (count & ~PAGE_CACHE_MASK) != 0)
2425 npages = batch >> PAGE_CACHE_SHIFT;
2426 tot_pages = count >> PAGE_CACHE_SHIFT;
2428 OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
2429 OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
2431 if (lnb == NULL || rnb == NULL)
2432 GOTO(out, ret = -ENOMEM);
2434 if (rw == OBD_BRW_WRITE && async)
2435 brw_flags |= OBD_BRW_ASYNC;
2437 obdo_to_ioobj(oa, &ioo);
2441 for(; tot_pages; tot_pages -= npages) {
2444 if (tot_pages < npages)
2447 for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
2448 rnb[i].rnb_offset = off;
2449 rnb[i].rnb_len = PAGE_CACHE_SIZE;
2450 rnb[i].rnb_flags = brw_flags;
2453 ioo.ioo_bufcnt = npages;
2456 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
2460 LASSERT(lpages == npages);
2462 for (i = 0; i < lpages; i++) {
2463 struct page *page = lnb[i].lnb_page;
2465 /* read past eof? */
2466 if (page == NULL && lnb[i].lnb_rc == 0)
2470 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2472 if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2473 (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2474 (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2477 if (rw == OBD_BRW_WRITE)
2478 echo_client_page_debug_setup(page, rw,
2479 ostid_id(&oa->o_oi),
2483 echo_client_page_debug_check(page,
2484 ostid_id(&oa->o_oi),
2489 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
2490 rnb, npages, lnb, oti, ret);
2494 /* Reset oti otherwise it would confuse ldiskfs. */
2495 memset(oti, 0, sizeof(*oti));
2497 /* Reuse env context. */
2498 lu_context_exit((struct lu_context *)&env->le_ctx);
2499 lu_context_enter((struct lu_context *)&env->le_ctx);
2504 OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
2506 OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
2510 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2511 struct obd_export *exp,
2512 struct obd_ioctl_data *data,
2513 struct obd_trans_info *dummy_oti)
2515 struct obd_device *obd = class_exp2obd(exp);
2516 struct echo_device *ed = obd2echo_dev(obd);
2517 struct echo_client_obd *ec = ed->ed_ec;
2518 struct obdo *oa = &data->ioc_obdo1;
2519 struct echo_object *eco;
2525 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2527 rc = echo_get_object(&eco, ed, oa);
2531 oa->o_valid &= ~OBD_MD_FLHANDLE;
2533 /* OFD/obdfilter works only via prep/commit */
2534 test_mode = (long)data->ioc_pbuf1;
2535 if (ed->ed_next == NULL && test_mode != 3) {
2537 data->ioc_plen1 = data->ioc_count;
2543 /* Truncate batch size to maximum */
2544 if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2545 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2547 switch (test_mode) {
2551 rc = echo_client_kbrw(ed, rw, oa,
2552 eco, data->ioc_offset,
2553 data->ioc_count, async, dummy_oti);
2556 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
2557 eco, data->ioc_offset,
2558 data->ioc_count, data->ioc_plen1,
2564 echo_put_object(eco);
2569 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2570 void *karg, void *uarg)
2572 #ifdef HAVE_SERVER_SUPPORT
2573 struct tgt_session_info *tsi;
2575 struct obd_device *obd = exp->exp_obd;
2576 struct echo_device *ed = obd2echo_dev(obd);
2577 struct echo_client_obd *ec = ed->ed_ec;
2578 struct echo_object *eco;
2579 struct obd_ioctl_data *data = karg;
2580 struct obd_trans_info dummy_oti;
2582 struct oti_req_ack_lock *ack_lock;
2585 int rw = OBD_BRW_READ;
2588 #ifdef HAVE_SERVER_SUPPORT
2589 struct lu_context echo_session;
2593 memset(&dummy_oti, 0, sizeof(dummy_oti));
2595 oa = &data->ioc_obdo1;
2596 if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2597 oa->o_valid |= OBD_MD_FLGROUP;
2598 ostid_set_seq_echo(&oa->o_oi);
2601 /* This FID is unpacked just for validation at this point */
2602 rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2610 rc = lu_env_init(env, LCT_DT_THREAD);
2612 GOTO(out_alloc, rc = -ENOMEM);
2614 #ifdef HAVE_SERVER_SUPPORT
2615 env->le_ses = &echo_session;
2616 rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
2617 if (unlikely(rc < 0))
2619 lu_context_enter(env->le_ses);
2621 tsi = tgt_ses_info(env);
2622 tsi->tsi_exp = ec->ec_exp;
2623 tsi->tsi_jobid = NULL;
2626 case OBD_IOC_CREATE: /* may create echo object */
2627 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2628 GOTO (out, rc = -EPERM);
2630 rc = echo_create_object(env, ed, oa, &dummy_oti);
2633 #ifdef HAVE_SERVER_SUPPORT
2634 case OBD_IOC_ECHO_MD: {
2641 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2642 GOTO(out, rc = -EPERM);
2644 count = data->ioc_count;
2645 cmd = data->ioc_command;
2647 id = data->ioc_obdo2.o_oi.oi.oi_id;
2648 dirlen = data->ioc_plen1;
2649 OBD_ALLOC(dir, dirlen + 1);
2651 GOTO(out, rc = -ENOMEM);
2653 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2654 OBD_FREE(dir, data->ioc_plen1 + 1);
2655 GOTO(out, rc = -EFAULT);
2658 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2659 OBD_FREE(dir, dirlen + 1);
2662 case OBD_IOC_ECHO_ALLOC_SEQ: {
2663 struct lu_env *cl_env;
2668 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2669 GOTO(out, rc = -EPERM);
2671 cl_env = cl_env_get(&refcheck);
2673 GOTO(out, rc = PTR_ERR(cl_env));
2675 rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
2678 cl_env_put(cl_env, &refcheck);
2682 rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
2683 cl_env_put(cl_env, &refcheck);
2685 CERROR("%s: Can not alloc seq: rc = %d\n",
2690 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2693 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2694 if (copy_to_user(data->ioc_pbuf2, &max_count,
2699 #endif /* HAVE_SERVER_SUPPORT */
2700 case OBD_IOC_DESTROY:
2701 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2702 GOTO (out, rc = -EPERM);
2704 rc = echo_get_object(&eco, ed, oa);
2706 rc = obd_destroy(env, ec->ec_exp, oa, &dummy_oti);
2708 eco->eo_deleted = 1;
2709 echo_put_object(eco);
2713 case OBD_IOC_GETATTR:
2714 rc = echo_get_object(&eco, ed, oa);
2716 struct obd_info oinfo = {
2720 rc = obd_getattr(env, ec->ec_exp, &oinfo);
2721 echo_put_object(eco);
2725 case OBD_IOC_SETATTR:
2726 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2727 GOTO (out, rc = -EPERM);
2729 rc = echo_get_object(&eco, ed, oa);
2731 struct obd_info oinfo = {
2735 rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
2736 echo_put_object(eco);
2740 case OBD_IOC_BRW_WRITE:
2741 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2742 GOTO (out, rc = -EPERM);
2746 case OBD_IOC_BRW_READ:
2747 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
2751 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2752 GOTO (out, rc = -ENOTTY);
2757 #ifdef HAVE_SERVER_SUPPORT
2758 lu_context_exit(env->le_ses);
2759 lu_context_fini(env->le_ses);
2766 /* XXX this should be in a helper also called by target_send_reply */
2767 for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2769 if (!ack_lock->mode)
2771 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2777 static int echo_client_setup(const struct lu_env *env,
2778 struct obd_device *obddev, struct lustre_cfg *lcfg)
2780 struct echo_client_obd *ec = &obddev->u.echo_client;
2781 struct obd_device *tgt;
2782 struct obd_uuid echo_uuid = { "ECHO_UUID" };
2783 struct obd_connect_data *ocd = NULL;
2787 if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2788 CERROR("requires a TARGET OBD name\n");
2792 tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2793 if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2794 CERROR("device not attached or not set up (%s)\n",
2795 lustre_cfg_string(lcfg, 1));
2799 spin_lock_init(&ec->ec_lock);
2800 INIT_LIST_HEAD(&ec->ec_objects);
2801 INIT_LIST_HEAD(&ec->ec_locks);
2804 if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2805 #ifdef HAVE_SERVER_SUPPORT
2806 lu_context_tags_update(ECHO_MD_CTX_TAG);
2807 lu_session_tags_update(ECHO_MD_SES_TAG);
2809 CERROR("Local operations are NOT supported on client side. "
2810 "Only remote operations are supported. Metadata client "
2811 "must be run on server side.\n");
2816 OBD_ALLOC(ocd, sizeof(*ocd));
2818 CERROR("Can't alloc ocd connecting to %s\n",
2819 lustre_cfg_string(lcfg, 1));
2823 ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2824 OBD_CONNECT_BRW_SIZE |
2825 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2826 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2828 ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2829 ocd->ocd_version = LUSTRE_VERSION_CODE;
2830 ocd->ocd_group = FID_SEQ_ECHO;
2832 rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2834 /* Turn off pinger because it connects to tgt obd directly. */
2835 spin_lock(&tgt->obd_dev_lock);
2836 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2837 spin_unlock(&tgt->obd_dev_lock);
2840 OBD_FREE(ocd, sizeof(*ocd));
2843 CERROR("fail to connect to device %s\n",
2844 lustre_cfg_string(lcfg, 1));
2851 static int echo_client_cleanup(struct obd_device *obddev)
2853 struct echo_device *ed = obd2echo_dev(obddev);
2854 struct echo_client_obd *ec = &obddev->u.echo_client;
2858 /*Do nothing for Metadata echo client*/
2862 if (ed->ed_next_ismd) {
2863 #ifdef HAVE_SERVER_SUPPORT
2864 lu_context_tags_clear(ECHO_MD_CTX_TAG);
2865 lu_session_tags_clear(ECHO_MD_SES_TAG);
2867 CERROR("This is client-side only module, does not support "
2868 "metadata echo client.\n");
2873 if (!list_empty(&obddev->obd_exports)) {
2874 CERROR("still has clients!\n");
2878 LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2879 rc = obd_disconnect(ec->ec_exp);
2881 CERROR("fail to disconnect device: %d\n", rc);
2886 static int echo_client_connect(const struct lu_env *env,
2887 struct obd_export **exp,
2888 struct obd_device *src, struct obd_uuid *cluuid,
2889 struct obd_connect_data *data, void *localdata)
2892 struct lustre_handle conn = { 0 };
2895 rc = class_connect(&conn, src, cluuid);
2897 *exp = class_conn2export(&conn);
2903 static int echo_client_disconnect(struct obd_export *exp)
2909 GOTO(out, rc = -EINVAL);
2911 rc = class_disconnect(exp);
2917 static struct obd_ops echo_client_obd_ops = {
2918 .o_owner = THIS_MODULE,
2919 .o_iocontrol = echo_client_iocontrol,
2920 .o_connect = echo_client_connect,
2921 .o_disconnect = echo_client_disconnect
2924 static int echo_client_init(void)
2928 rc = lu_kmem_init(echo_caches);
2930 rc = class_register_type(&echo_client_obd_ops, NULL, true, NULL,
2931 LUSTRE_ECHO_CLIENT_NAME,
2934 lu_kmem_fini(echo_caches);
2939 static void echo_client_exit(void)
2941 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2942 lu_kmem_fini(echo_caches);
2945 static int __init obdecho_init(void)
2950 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2952 LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2954 # ifdef HAVE_SERVER_SUPPORT
2955 rc = echo_persistent_pages_init();
2959 rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
2960 LUSTRE_ECHO_NAME, NULL);
2965 rc = echo_client_init();
2967 # ifdef HAVE_SERVER_SUPPORT
2971 class_unregister_type(LUSTRE_ECHO_NAME);
2973 echo_persistent_pages_fini();
2979 static void /*__exit*/ obdecho_exit(void)
2983 # ifdef HAVE_SERVER_SUPPORT
2984 class_unregister_type(LUSTRE_ECHO_NAME);
2985 echo_persistent_pages_fini();
2989 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2990 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2991 MODULE_LICENSE("GPL");
2993 cfs_module(obdecho, LUSTRE_VERSION_STRING, obdecho_init, obdecho_exit);
2995 /** @} echo_client */