4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_ECHO
35 #include <linux/user_namespace.h>
36 #ifdef HAVE_UIDGID_HEADER
37 # include <linux/uidgid.h>
39 #include <libcfs/libcfs.h>
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <lustre_debug.h>
45 #include <lprocfs_status.h>
46 #include <cl_object.h>
47 #include <lustre_fid.h>
48 #include <lustre_lmv.h>
49 #include <lustre_acl.h>
50 #include <uapi/linux/lustre/lustre_ioctl.h>
51 #include <lustre_net.h>
52 #ifdef HAVE_SERVER_SUPPORT
53 # include <md_object.h>
55 #define ETI_NAME_LEN 20
57 #endif /* HAVE_SERVER_SUPPORT */
59 #include "echo_internal.h"
61 /** \defgroup echo_client Echo Client
65 /* echo thread key have a CL_THREAD flag, which set cl_env function directly */
66 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
67 #define ECHO_DT_CTX_TAG (LCT_REMEMBER | LCT_DT_THREAD)
68 #define ECHO_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
71 struct cl_device ed_cl;
72 struct echo_client_obd *ed_ec;
74 struct cl_site ed_site_myself;
75 struct lu_site *ed_site;
76 struct lu_device *ed_next;
78 struct lu_client_seq *ed_cl_seq;
79 #ifdef HAVE_SERVER_SUPPORT
80 struct local_oid_storage *ed_los;
81 struct lu_fid ed_root_fid;
82 #endif /* HAVE_SERVER_SUPPORT */
86 struct cl_object eo_cl;
87 struct cl_object_header eo_hdr;
88 struct echo_device *eo_dev;
89 struct list_head eo_obj_chain;
90 struct lov_oinfo *eo_oinfo;
95 struct echo_object_conf {
96 struct cl_object_conf eoc_cl;
97 struct lov_oinfo **eoc_oinfo;
101 struct cl_page_slice ep_cl;
102 unsigned long ep_lock;
106 struct cl_lock_slice el_cl;
107 struct list_head el_chain;
108 struct echo_object *el_object;
110 atomic_t el_refcount;
113 #ifdef HAVE_SERVER_SUPPORT
114 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
117 * In order to use the values of members in struct mdd_device,
118 * we define an alias structure here.
120 struct echo_md_device {
121 struct md_device emd_md_dev;
122 struct obd_export *emd_child_exp;
123 struct dt_device *emd_child;
124 struct dt_device *emd_bottom;
125 struct lu_fid emd_root_fid;
126 struct lu_fid emd_local_root_fid;
128 #endif /* HAVE_SERVER_SUPPORT */
130 static int echo_client_setup(const struct lu_env *env,
131 struct obd_device *obddev,
132 struct lustre_cfg *lcfg);
133 static int echo_client_cleanup(struct obd_device *obddev);
135 /** \defgroup echo_helpers Helper functions
138 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
140 return container_of0(dev, struct echo_device, ed_cl);
143 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
148 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
150 return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
153 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
158 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
160 return container_of(o, struct echo_object, eo_cl);
163 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
165 return container_of(s, struct echo_page, ep_cl);
168 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
170 return container_of(s, struct echo_lock, el_cl);
173 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
175 return ecl->el_cl.cls_lock;
178 static struct lu_context_key echo_thread_key;
180 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
182 struct echo_thread_info *info;
184 info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
185 LASSERT(info != NULL);
190 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
192 return container_of(c, struct echo_object_conf, eoc_cl);
195 #ifdef HAVE_SERVER_SUPPORT
196 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
198 return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
201 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
203 return &d->emd_md_dev.md_lu_dev;
206 static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
208 return emd2lu_dev(d)->ld_site->ld_seq_site;
211 static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
213 return d->emd_md_dev.md_lu_dev.ld_obd;
215 #endif /* HAVE_SERVER_SUPPORT */
217 /** @} echo_helpers */
219 static int cl_echo_object_put(struct echo_object *eco);
220 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
221 struct page **pages, int npages, int async);
223 struct echo_thread_info {
224 struct echo_object_conf eti_conf;
225 struct lustre_md eti_md;
226 struct cl_2queue eti_queue;
228 struct cl_lock eti_lock;
229 struct lu_fid eti_fid;
230 struct lu_fid eti_fid2;
231 #ifdef HAVE_SERVER_SUPPORT
232 struct md_op_spec eti_spec;
233 struct lov_mds_md_v3 eti_lmm;
234 struct lov_user_md_v3 eti_lum;
235 struct md_attr eti_ma;
236 struct lu_name eti_lname;
237 /* per-thread values, can be re-used */
238 void *eti_big_lmm; /* may be vmalloc'd */
240 char eti_name[ETI_NAME_LEN];
241 struct lu_buf eti_buf;
242 /* If we want to test large ACL, then need to enlarge the buffer. */
243 char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE_OLD];
247 /* No session used right now */
248 struct echo_session_info {
252 static struct kmem_cache *echo_lock_kmem;
253 static struct kmem_cache *echo_object_kmem;
254 static struct kmem_cache *echo_thread_kmem;
255 static struct kmem_cache *echo_session_kmem;
256 /* static struct kmem_cache *echo_req_kmem; */
258 static struct lu_kmem_descr echo_caches[] = {
260 .ckd_cache = &echo_lock_kmem,
261 .ckd_name = "echo_lock_kmem",
262 .ckd_size = sizeof(struct echo_lock)
265 .ckd_cache = &echo_object_kmem,
266 .ckd_name = "echo_object_kmem",
267 .ckd_size = sizeof(struct echo_object)
270 .ckd_cache = &echo_thread_kmem,
271 .ckd_name = "echo_thread_kmem",
272 .ckd_size = sizeof(struct echo_thread_info)
275 .ckd_cache = &echo_session_kmem,
276 .ckd_name = "echo_session_kmem",
277 .ckd_size = sizeof(struct echo_session_info)
284 /** \defgroup echo_page Page operations
286 * Echo page operations.
290 static int echo_page_own(const struct lu_env *env,
291 const struct cl_page_slice *slice,
292 struct cl_io *io, int nonblock)
294 struct echo_page *ep = cl2echo_page(slice);
297 if (test_and_set_bit(0, &ep->ep_lock))
300 while (test_and_set_bit(0, &ep->ep_lock))
301 wait_on_bit(&ep->ep_lock, 0, TASK_UNINTERRUPTIBLE);
306 static void echo_page_disown(const struct lu_env *env,
307 const struct cl_page_slice *slice,
310 struct echo_page *ep = cl2echo_page(slice);
312 LASSERT(test_bit(0, &ep->ep_lock));
313 clear_and_wake_up_bit(0, &ep->ep_lock);
316 static void echo_page_discard(const struct lu_env *env,
317 const struct cl_page_slice *slice,
318 struct cl_io *unused)
320 cl_page_delete(env, slice->cpl_page);
323 static int echo_page_is_vmlocked(const struct lu_env *env,
324 const struct cl_page_slice *slice)
326 if (test_bit(0, &cl2echo_page(slice)->ep_lock))
331 static void echo_page_completion(const struct lu_env *env,
332 const struct cl_page_slice *slice,
335 LASSERT(slice->cpl_page->cp_sync_io != NULL);
338 static void echo_page_fini(const struct lu_env *env,
339 struct cl_page_slice *slice,
340 struct pagevec *pvec)
342 struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
345 atomic_dec(&eco->eo_npages);
346 put_page(slice->cpl_page->cp_vmpage);
350 static int echo_page_prep(const struct lu_env *env,
351 const struct cl_page_slice *slice,
352 struct cl_io *unused)
357 static int echo_page_print(const struct lu_env *env,
358 const struct cl_page_slice *slice,
359 void *cookie, lu_printer_t printer)
361 struct echo_page *ep = cl2echo_page(slice);
363 (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
364 ep, test_bit(0, &ep->ep_lock),
365 slice->cpl_page->cp_vmpage);
369 static const struct cl_page_operations echo_page_ops = {
370 .cpo_own = echo_page_own,
371 .cpo_disown = echo_page_disown,
372 .cpo_discard = echo_page_discard,
373 .cpo_fini = echo_page_fini,
374 .cpo_print = echo_page_print,
375 .cpo_is_vmlocked = echo_page_is_vmlocked,
378 .cpo_prep = echo_page_prep,
379 .cpo_completion = echo_page_completion,
382 .cpo_prep = echo_page_prep,
383 .cpo_completion = echo_page_completion,
390 /** \defgroup echo_lock Locking
392 * echo lock operations
396 static void echo_lock_fini(const struct lu_env *env,
397 struct cl_lock_slice *slice)
399 struct echo_lock *ecl = cl2echo_lock(slice);
401 LASSERT(list_empty(&ecl->el_chain));
402 OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
405 static struct cl_lock_operations echo_lock_ops = {
406 .clo_fini = echo_lock_fini,
411 /** \defgroup echo_cl_ops cl_object operations
413 * operations for cl_object
417 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
418 struct cl_page *page, pgoff_t index)
420 struct echo_page *ep = cl_object_page_slice(obj, page);
421 struct echo_object *eco = cl2echo_obj(obj);
424 get_page(page->cp_vmpage);
426 * ep_lock is similar to the lock_page() lock, and
427 * cannot usefully be monitored by lockdep.
428 * So just use a bit in an "unsigned long" and use the
429 * wait_on_bit() interface to wait for the bit to be clear.
432 cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
433 atomic_inc(&eco->eo_npages);
437 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
443 static int echo_lock_init(const struct lu_env *env,
444 struct cl_object *obj, struct cl_lock *lock,
445 const struct cl_io *unused)
447 struct echo_lock *el;
450 OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
452 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
453 el->el_object = cl2echo_obj(obj);
454 INIT_LIST_HEAD(&el->el_chain);
455 atomic_set(&el->el_refcount, 0);
457 RETURN(el ? 0 : -ENOMEM);
460 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
461 const struct cl_object_conf *conf)
466 static const struct cl_object_operations echo_cl_obj_ops = {
467 .coo_page_init = echo_page_init,
468 .coo_lock_init = echo_lock_init,
469 .coo_io_init = echo_io_init,
470 .coo_conf_set = echo_conf_set
472 /** @} echo_cl_ops */
474 /** \defgroup echo_lu_ops lu_object operations
476 * operations for echo lu object.
480 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
481 const struct lu_object_conf *conf)
483 struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev));
484 struct echo_client_obd *ec = ed->ed_ec;
485 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
489 struct lu_object *below;
490 struct lu_device *under;
493 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
497 lu_object_add(obj, below);
500 if (!ed->ed_next_ismd) {
501 const struct cl_object_conf *cconf = lu2cl_conf(conf);
502 struct echo_object_conf *econf = cl2echo_conf(cconf);
504 LASSERT(econf->eoc_oinfo != NULL);
507 * Transfer the oinfo pointer to eco that it won't be
510 eco->eo_oinfo = *econf->eoc_oinfo;
511 *econf->eoc_oinfo = NULL;
513 eco->eo_oinfo = NULL;
517 atomic_set(&eco->eo_npages, 0);
518 cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
520 spin_lock(&ec->ec_lock);
521 list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
522 spin_unlock(&ec->ec_lock);
527 static void echo_object_delete(const struct lu_env *env, struct lu_object *obj)
529 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
530 struct echo_client_obd *ec;
534 /* object delete called unconditolally - layer init or not */
535 if (eco->eo_dev == NULL)
538 ec = eco->eo_dev->ed_ec;
540 LASSERT(atomic_read(&eco->eo_npages) == 0);
542 spin_lock(&ec->ec_lock);
543 list_del_init(&eco->eo_obj_chain);
544 spin_unlock(&ec->ec_lock);
547 OBD_FREE_PTR(eco->eo_oinfo);
550 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
552 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
557 lu_object_header_fini(obj->lo_header);
559 OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
563 static int echo_object_print(const struct lu_env *env, void *cookie,
564 lu_printer_t p, const struct lu_object *o)
566 struct echo_object *obj = cl2echo_obj(lu2cl(o));
568 return (*p)(env, cookie, "echoclient-object@%p", obj);
571 static const struct lu_object_operations echo_lu_obj_ops = {
572 .loo_object_init = echo_object_init,
573 .loo_object_delete = echo_object_delete,
574 .loo_object_release = NULL,
575 .loo_object_free = echo_object_free,
576 .loo_object_print = echo_object_print,
577 .loo_object_invariant = NULL
579 /** @} echo_lu_ops */
581 /** \defgroup echo_lu_dev_ops lu_device operations
583 * Operations for echo lu device.
587 static struct lu_object *echo_object_alloc(const struct lu_env *env,
588 const struct lu_object_header *hdr,
589 struct lu_device *dev)
591 struct echo_object *eco;
592 struct lu_object *obj = NULL;
595 /* we're the top dev. */
596 LASSERT(hdr == NULL);
597 OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
599 struct cl_object_header *hdr = &eco->eo_hdr;
601 obj = &echo_obj2cl(eco)->co_lu;
602 cl_object_header_init(hdr);
603 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
605 lu_object_init(obj, &hdr->coh_lu, dev);
606 lu_object_add_top(&hdr->coh_lu, obj);
608 eco->eo_cl.co_ops = &echo_cl_obj_ops;
609 obj->lo_ops = &echo_lu_obj_ops;
614 static struct lu_device_operations echo_device_lu_ops = {
615 .ldo_object_alloc = echo_object_alloc,
618 /** @} echo_lu_dev_ops */
620 /** \defgroup echo_init Setup and teardown
622 * Init and fini functions for echo client.
626 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
628 struct cl_site *site = &ed->ed_site_myself;
631 /* initialize site */
632 rc = cl_site_init(site, &ed->ed_cl);
634 CERROR("Cannot initialize site for echo client(%d)\n", rc);
638 rc = lu_site_init_finish(&site->cs_lu);
644 ed->ed_site = &site->cs_lu;
648 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
651 if (!ed->ed_next_ismd)
652 lu_site_fini(ed->ed_site);
657 static void *echo_thread_key_init(const struct lu_context *ctx,
658 struct lu_context_key *key)
660 struct echo_thread_info *info;
662 OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
664 info = ERR_PTR(-ENOMEM);
668 static void echo_thread_key_fini(const struct lu_context *ctx,
669 struct lu_context_key *key, void *data)
671 struct echo_thread_info *info = data;
673 OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
676 static struct lu_context_key echo_thread_key = {
677 .lct_tags = LCT_CL_THREAD,
678 .lct_init = echo_thread_key_init,
679 .lct_fini = echo_thread_key_fini,
682 static void *echo_session_key_init(const struct lu_context *ctx,
683 struct lu_context_key *key)
685 struct echo_session_info *session;
687 OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
689 session = ERR_PTR(-ENOMEM);
693 static void echo_session_key_fini(const struct lu_context *ctx,
694 struct lu_context_key *key, void *data)
696 struct echo_session_info *session = data;
698 OBD_SLAB_FREE_PTR(session, echo_session_kmem);
701 static struct lu_context_key echo_session_key = {
702 .lct_tags = LCT_SESSION,
703 .lct_init = echo_session_key_init,
704 .lct_fini = echo_session_key_fini,
707 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
709 #ifdef HAVE_SERVER_SUPPORT
710 # define ECHO_SEQ_WIDTH 0xffffffff
711 static int echo_fid_init(struct echo_device *ed, char *obd_name,
712 struct seq_server_site *ss)
718 OBD_ALLOC_PTR(ed->ed_cl_seq);
722 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
724 GOTO(out_free_seq, rc = -ENOMEM);
726 snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
728 /* Init client side sequence-manager */
729 rc = seq_client_init(ed->ed_cl_seq, NULL,
731 prefix, ss->ss_server_seq);
732 ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
733 OBD_FREE(prefix, MAX_OBD_NAME + 5);
735 GOTO(out_free_seq, rc);
740 OBD_FREE_PTR(ed->ed_cl_seq);
741 ed->ed_cl_seq = NULL;
745 static int echo_fid_fini(struct obd_device *obddev)
747 struct echo_device *ed = obd2echo_dev(obddev);
751 seq_client_fini(ed->ed_cl_seq);
752 OBD_FREE_PTR(ed->ed_cl_seq);
753 ed->ed_cl_seq = NULL;
759 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
762 if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
763 local_oid_storage_fini(env, ed->ed_los);
769 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
770 struct local_oid_storage *los,
771 const struct lu_fid *pfid, const char *name,
772 __u32 mode, struct lu_fid *fid)
774 struct dt_object *parent = NULL;
775 struct dt_object *dto = NULL;
779 LASSERT(!fid_is_zero(pfid));
780 parent = dt_locate(env, emd->emd_bottom, pfid);
781 if (unlikely(IS_ERR(parent)))
782 RETURN(PTR_ERR(parent));
784 /* create local file with @fid */
785 dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
788 GOTO(out_put, rc = PTR_ERR(dto));
790 *fid = *lu_object_fid(&dto->do_lu);
792 * since stack is not fully set up the local_storage uses own stack
793 * and we should drop its object from cache
795 dt_object_put_nocache(env, dto);
799 dt_object_put(env, parent);
804 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
805 struct echo_device *ed)
811 /* Setup local dirs */
812 fid.f_seq = FID_SEQ_LOCAL_NAME;
815 rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
819 lu_echo_root_fid(&fid);
820 if (echo_md_seq_site(emd)->ss_node_id == 0) {
821 rc = echo_md_local_file_create(env, emd, ed->ed_los,
822 &emd->emd_local_root_fid,
823 echo_md_root_dir_name, S_IFDIR |
824 S_IRUGO | S_IWUSR | S_IXUGO,
827 CERROR("%s: create md echo root fid failed: rc = %d\n",
828 emd2obd_dev(emd)->obd_name, rc);
832 ed->ed_root_fid = fid;
836 echo_ed_los_fini(env, ed);
840 #endif /* HAVE_SERVER_SUPPORT */
842 static struct lu_device *echo_device_alloc(const struct lu_env *env,
843 struct lu_device_type *t,
844 struct lustre_cfg *cfg)
846 struct lu_device *next;
847 struct echo_device *ed;
848 struct cl_device *cd;
849 struct obd_device *obd = NULL; /* to keep compiler happy */
850 struct obd_device *tgt;
851 const char *tgt_type_name;
858 GOTO(out, rc = -ENOMEM);
862 rc = cl_device_init(cd, t);
866 cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
869 obd = class_name2obd(lustre_cfg_string(cfg, 0));
870 LASSERT(obd != NULL);
871 LASSERT(env != NULL);
873 tgt = class_name2obd(lustre_cfg_string(cfg, 1));
875 CERROR("Can not find tgt device %s\n",
876 lustre_cfg_string(cfg, 1));
877 GOTO(out, rc = -ENODEV);
880 next = tgt->obd_lu_dev;
882 if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
883 ed->ed_next_ismd = 1;
884 } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
885 strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
886 ed->ed_next_ismd = 0;
887 rc = echo_site_init(env, ed);
891 GOTO(out, rc = -EINVAL);
896 rc = echo_client_setup(env, obd, cfg);
900 ed->ed_ec = &obd->u.echo_client;
903 if (ed->ed_next_ismd) {
904 #ifdef HAVE_SERVER_SUPPORT
905 /* Suppose to connect to some Metadata layer */
906 struct lu_site *ls = NULL;
907 struct lu_device *ld = NULL;
908 struct md_device *md = NULL;
909 struct echo_md_device *emd = NULL;
913 CERROR("%s is not lu device type!\n",
914 lustre_cfg_string(cfg, 1));
915 GOTO(out, rc = -EINVAL);
918 tgt_type_name = lustre_cfg_string(cfg, 2);
919 if (!tgt_type_name) {
920 CERROR("%s no type name for echo %s setup\n",
921 lustre_cfg_string(cfg, 1),
922 tgt->obd_type->typ_name);
923 GOTO(out, rc = -EINVAL);
928 spin_lock(&ls->ls_ld_lock);
929 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
930 if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
935 spin_unlock(&ls->ls_ld_lock);
938 CERROR("%s is not lu device type!\n",
939 lustre_cfg_string(cfg, 1));
940 GOTO(out, rc = -EINVAL);
944 /* For MD echo client, it will use the site in MDS stack */
946 ed->ed_cl.cd_lu_dev.ld_site = ls;
947 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
949 CERROR("echo fid init error %d\n", rc);
953 md = lu2md_dev(next);
954 emd = lu2emd_dev(&md->md_lu_dev);
955 rc = echo_md_root_get(env, emd, ed);
957 CERROR("%s: get root error: rc = %d\n",
958 emd2obd_dev(emd)->obd_name, rc);
961 #else /* !HAVE_SERVER_SUPPORT */
963 "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
964 GOTO(out, rc = -EOPNOTSUPP);
965 #endif /* HAVE_SERVER_SUPPORT */
968 * if echo client is to be stacked upon ost device, the next is
969 * NULL since ost is not a clio device so far
971 if (next != NULL && !lu_device_is_cl(next))
974 tgt_type_name = tgt->obd_type->typ_name;
976 LASSERT(next != NULL);
978 GOTO(out, rc = -EBUSY);
980 next->ld_site = ed->ed_site;
981 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
982 next->ld_type->ldt_name,
987 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
992 RETURN(&cd->cd_lu_dev);
998 rc2 = echo_client_cleanup(obd);
1000 CERROR("Cleanup obd device %s error(%d)\n",
1001 obd->obd_name, rc2);
1006 echo_site_fini(env, ed);
1009 cl_device_fini(&ed->ed_cl);
1021 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
1022 const char *name, struct lu_device *next)
1028 static struct lu_device *echo_device_fini(const struct lu_env *env,
1029 struct lu_device *d)
1031 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
1032 struct lu_device *next = ed->ed_next;
1034 while (next && !ed->ed_next_ismd)
1035 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
1039 static void echo_lock_release(const struct lu_env *env,
1040 struct echo_lock *ecl,
1043 struct cl_lock *clk = echo_lock2cl(ecl);
1045 cl_lock_release(env, clk);
1048 static struct lu_device *echo_device_free(const struct lu_env *env,
1049 struct lu_device *d)
1051 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
1052 struct echo_client_obd *ec = ed->ed_ec;
1053 struct echo_object *eco;
1054 struct lu_device *next = ed->ed_next;
1056 CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
1059 lu_site_purge(env, ed->ed_site, -1);
1062 * check if there are objects still alive.
1063 * It shouldn't have any object because lu_site_purge would cleanup
1064 * all of cached objects. Anyway, probably the echo device is being
1065 * parallelly accessed.
1067 spin_lock(&ec->ec_lock);
1068 list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
1069 eco->eo_deleted = 1;
1070 spin_unlock(&ec->ec_lock);
1073 lu_site_purge(env, ed->ed_site, -1);
1076 "Waiting for the reference of echo object to be dropped\n");
1078 /* Wait for the last reference to be dropped. */
1079 spin_lock(&ec->ec_lock);
1080 while (!list_empty(&ec->ec_objects)) {
1081 spin_unlock(&ec->ec_lock);
1083 "echo_client still has objects at cleanup time, wait for 1 second\n");
1084 set_current_state(TASK_UNINTERRUPTIBLE);
1085 schedule_timeout(cfs_time_seconds(1));
1086 lu_site_purge(env, ed->ed_site, -1);
1087 spin_lock(&ec->ec_lock);
1089 spin_unlock(&ec->ec_lock);
1091 LASSERT(list_empty(&ec->ec_locks));
1093 CDEBUG(D_INFO, "No object exists, exiting...\n");
1095 echo_client_cleanup(d->ld_obd);
1096 #ifdef HAVE_SERVER_SUPPORT
1097 echo_fid_fini(d->ld_obd);
1098 echo_ed_los_fini(env, ed);
1100 while (next && !ed->ed_next_ismd)
1101 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1103 LASSERT(ed->ed_site == d->ld_site);
1104 echo_site_fini(env, ed);
1105 cl_device_fini(&ed->ed_cl);
1108 cl_env_cache_purge(~0);
1113 static const struct lu_device_type_operations echo_device_type_ops = {
1114 .ldto_init = echo_type_init,
1115 .ldto_fini = echo_type_fini,
1117 .ldto_start = echo_type_start,
1118 .ldto_stop = echo_type_stop,
1120 .ldto_device_alloc = echo_device_alloc,
1121 .ldto_device_free = echo_device_free,
1122 .ldto_device_init = echo_device_init,
1123 .ldto_device_fini = echo_device_fini
1126 static struct lu_device_type echo_device_type = {
1127 .ldt_tags = LU_DEVICE_CL,
1128 .ldt_name = LUSTRE_ECHO_CLIENT_NAME,
1129 .ldt_ops = &echo_device_type_ops,
1130 .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1134 /** \defgroup echo_exports Exported operations
1136 * exporting functions to echo client
1141 /* Interfaces to echo client obd device */
1142 static struct echo_object *
1143 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
1146 struct echo_thread_info *info;
1147 struct echo_object_conf *conf;
1148 struct echo_object *eco;
1149 struct cl_object *obj;
1150 struct lov_oinfo *oinfo = NULL;
1156 LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
1157 LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
1159 /* Never return an object if the obd is to be freed. */
1160 if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1161 RETURN(ERR_PTR(-ENODEV));
1163 env = cl_env_get(&refcheck);
1165 RETURN((void *)env);
1167 info = echo_env_info(env);
1168 conf = &info->eti_conf;
1170 OBD_ALLOC_PTR(oinfo);
1172 GOTO(out, eco = ERR_PTR(-ENOMEM));
1174 oinfo->loi_oi = *oi;
1175 conf->eoc_cl.u.coc_oinfo = oinfo;
1179 * If echo_object_init() is successful then ownership of oinfo
1180 * is transferred to the object.
1182 conf->eoc_oinfo = &oinfo;
1184 fid = &info->eti_fid;
1185 rc = ostid_to_fid(fid, oi, 0);
1187 GOTO(out, eco = ERR_PTR(rc));
1190 * In the function below, .hs_keycmp resolves to
1191 * lu_obj_hop_keycmp()
1193 /* coverity[overrun-buffer-val] */
1194 obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1196 GOTO(out, eco = (void *)obj);
1198 eco = cl2echo_obj(obj);
1199 if (eco->eo_deleted) {
1200 cl_object_put(env, obj);
1201 eco = ERR_PTR(-EAGAIN);
1206 OBD_FREE_PTR(oinfo);
1208 cl_env_put(env, &refcheck);
1212 static int cl_echo_object_put(struct echo_object *eco)
1215 struct cl_object *obj = echo_obj2cl(eco);
1219 env = cl_env_get(&refcheck);
1221 RETURN(PTR_ERR(env));
1223 /* an external function to kill an object? */
1224 if (eco->eo_deleted) {
1225 struct lu_object_header *loh = obj->co_lu.lo_header;
1227 LASSERT(&eco->eo_hdr == luh2coh(loh));
1228 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1231 cl_object_put(env, obj);
1232 cl_env_put(env, &refcheck);
1236 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1237 u64 start, u64 end, int mode,
1238 __u64 *cookie, __u32 enqflags)
1241 struct cl_lock *lck;
1242 struct cl_object *obj;
1243 struct cl_lock_descr *descr;
1244 struct echo_thread_info *info;
1248 info = echo_env_info(env);
1250 lck = &info->eti_lock;
1251 obj = echo_obj2cl(eco);
1253 memset(lck, 0, sizeof(*lck));
1254 descr = &lck->cll_descr;
1255 descr->cld_obj = obj;
1256 descr->cld_start = cl_index(obj, start);
1257 descr->cld_end = cl_index(obj, end);
1258 descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1259 descr->cld_enq_flags = enqflags;
1262 rc = cl_lock_request(env, io, lck);
1264 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1265 struct echo_lock *el;
1267 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1268 spin_lock(&ec->ec_lock);
1269 if (list_empty(&el->el_chain)) {
1270 list_add(&el->el_chain, &ec->ec_locks);
1271 el->el_cookie = ++ec->ec_unique;
1273 atomic_inc(&el->el_refcount);
1274 *cookie = el->el_cookie;
1275 spin_unlock(&ec->ec_lock);
1280 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1283 struct echo_client_obd *ec = ed->ed_ec;
1284 struct echo_lock *ecl = NULL;
1285 struct list_head *el;
1286 int found = 0, still_used = 0;
1289 LASSERT(ec != NULL);
1290 spin_lock(&ec->ec_lock);
1291 list_for_each(el, &ec->ec_locks) {
1292 ecl = list_entry(el, struct echo_lock, el_chain);
1293 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1294 found = (ecl->el_cookie == cookie);
1296 if (atomic_dec_and_test(&ecl->el_refcount))
1297 list_del_init(&ecl->el_chain);
1303 spin_unlock(&ec->ec_lock);
1308 echo_lock_release(env, ecl, still_used);
1312 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1313 struct pagevec *pvec)
1315 struct echo_thread_info *info;
1316 struct cl_2queue *queue;
1319 info = echo_env_info(env);
1320 LASSERT(io == &info->eti_io);
1322 queue = &info->eti_queue;
1324 for (i = 0; i < pagevec_count(pvec); i++) {
1325 struct page *vmpage = pvec->pages[i];
1326 struct cl_page *page = (struct cl_page *)vmpage->private;
1328 cl_page_list_add(&queue->c2_qout, page);
1332 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1333 struct page **pages, int npages, int async)
1336 struct echo_thread_info *info;
1337 struct cl_object *obj = echo_obj2cl(eco);
1338 struct echo_device *ed = eco->eo_dev;
1339 struct cl_2queue *queue;
1341 struct cl_page *clp;
1342 struct lustre_handle lh = { 0 };
1343 int page_size = cl_page_size(obj);
1349 LASSERT((offset & ~PAGE_MASK) == 0);
1350 LASSERT(ed->ed_next != NULL);
1351 env = cl_env_get(&refcheck);
1353 RETURN(PTR_ERR(env));
1355 info = echo_env_info(env);
1357 queue = &info->eti_queue;
1359 cl_2queue_init(queue);
1361 io->ci_ignore_layout = 1;
1362 rc = cl_io_init(env, io, CIT_MISC, obj);
1367 rc = cl_echo_enqueue0(env, eco, offset,
1368 offset + npages * PAGE_SIZE - 1,
1369 rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1372 GOTO(error_lock, rc);
1374 for (i = 0; i < npages; i++) {
1376 clp = cl_page_find(env, obj, cl_index(obj, offset),
1377 pages[i], CPT_TRANSIENT);
1382 LASSERT(clp->cp_type == CPT_TRANSIENT);
1384 rc = cl_page_own(env, io, clp);
1386 LASSERT(clp->cp_state == CPS_FREEING);
1387 cl_page_put(env, clp);
1391 cl_2queue_add(queue, clp);
1394 * drop the reference count for cl_page_find, so that the page
1395 * will be freed in cl_2queue_fini.
1397 cl_page_put(env, clp);
1398 cl_page_clip(env, clp, 0, page_size);
1400 offset += page_size;
1404 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1406 async = async && (typ == CRT_WRITE);
1408 rc = cl_io_commit_async(env, io, &queue->c2_qin,
1410 echo_commit_callback);
1412 rc = cl_io_submit_sync(env, io, typ, queue, 0);
1413 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1414 async ? "async" : "sync", rc);
1417 cl_echo_cancel0(env, ed, lh.cookie);
1420 cl_2queue_discard(env, io, queue);
1421 cl_2queue_disown(env, io, queue);
1422 cl_2queue_fini(env, queue);
1423 cl_io_fini(env, io);
1425 cl_env_put(env, &refcheck);
1428 /** @} echo_exports */
1430 static u64 last_object_id;
1432 #ifdef HAVE_SERVER_SUPPORT
1433 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1436 snprintf(name, ETI_NAME_LEN, "%llu", id);
1437 lname->ln_name = name;
1438 lname->ln_namelen = strlen(name);
1441 /* similar to mdt_attr_get_complex */
1442 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1445 struct echo_thread_info *info = echo_env_info(env);
1450 LASSERT(ma->ma_lmm_size > 0);
1452 LASSERT(ma->ma_need & (MA_LOV | MA_LMV));
1453 if (ma->ma_need & MA_LOV)
1454 rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1456 rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LMV);
1461 /* big_lmm may need to be grown */
1462 if (info->eti_big_lmmsize < rc) {
1463 int size = size_roundup_power2(rc);
1465 if (info->eti_big_lmmsize > 0) {
1466 /* free old buffer */
1467 LASSERT(info->eti_big_lmm);
1468 OBD_FREE_LARGE(info->eti_big_lmm,
1469 info->eti_big_lmmsize);
1470 info->eti_big_lmm = NULL;
1471 info->eti_big_lmmsize = 0;
1474 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1475 if (!info->eti_big_lmm)
1477 info->eti_big_lmmsize = size;
1479 LASSERT(info->eti_big_lmmsize >= rc);
1481 info->eti_buf.lb_buf = info->eti_big_lmm;
1482 info->eti_buf.lb_len = info->eti_big_lmmsize;
1483 if (ma->ma_need & MA_LOV)
1484 rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1486 rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LMV);
1490 if (ma->ma_need & MA_LOV)
1491 ma->ma_valid |= MA_LOV;
1493 ma->ma_valid |= MA_LMV;
1495 ma->ma_lmm = info->eti_big_lmm;
1496 ma->ma_lmm_size = rc;
1501 static int echo_attr_get_complex(const struct lu_env *env,
1502 struct md_object *next,
1505 struct echo_thread_info *info = echo_env_info(env);
1506 struct lu_buf *buf = &info->eti_buf;
1507 umode_t mode = lu_object_attr(&next->mo_lu);
1514 if (ma->ma_need & MA_INODE) {
1515 rc = mo_attr_get(env, next, ma);
1518 ma->ma_valid |= MA_INODE;
1521 if ((ma->ma_need & MA_LOV) && (S_ISREG(mode) || S_ISDIR(mode))) {
1522 LASSERT(ma->ma_lmm_size > 0);
1523 buf->lb_buf = ma->ma_lmm;
1524 buf->lb_len = ma->ma_lmm_size;
1525 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1527 ma->ma_lmm_size = rc2;
1528 ma->ma_valid |= MA_LOV;
1529 } else if (rc2 == -ENODATA) {
1531 ma->ma_lmm_size = 0;
1532 } else if (rc2 == -ERANGE) {
1533 rc2 = echo_big_lmm_get(env, next, ma);
1535 GOTO(out, rc = rc2);
1537 GOTO(out, rc = rc2);
1541 if ((ma->ma_need & MA_LMV) && S_ISDIR(mode)) {
1542 LASSERT(ma->ma_lmm_size > 0);
1543 buf->lb_buf = ma->ma_lmm;
1544 buf->lb_len = ma->ma_lmm_size;
1545 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
1547 ma->ma_lmm_size = rc2;
1548 ma->ma_valid |= MA_LMV;
1549 } else if (rc2 == -ENODATA) {
1551 ma->ma_lmm_size = 0;
1552 } else if (rc2 == -ERANGE) {
1553 rc2 = echo_big_lmm_get(env, next, ma);
1555 GOTO(out, rc = rc2);
1557 GOTO(out, rc = rc2);
1561 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1562 if ((ma->ma_need & MA_ACL_DEF) && S_ISDIR(mode)) {
1563 buf->lb_buf = ma->ma_acl;
1564 buf->lb_len = ma->ma_acl_size;
1565 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1567 ma->ma_acl_size = rc2;
1568 ma->ma_valid |= MA_ACL_DEF;
1569 } else if (rc2 == -ENODATA) {
1571 ma->ma_acl_size = 0;
1573 GOTO(out, rc = rc2);
1578 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1579 rc, ma->ma_valid, ma->ma_lmm);
1584 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1585 struct md_object *parent, struct lu_fid *fid,
1586 struct lu_name *lname, struct md_op_spec *spec,
1589 struct lu_object *ec_child, *child;
1590 struct lu_device *ld = ed->ed_next;
1591 struct echo_thread_info *info = echo_env_info(env);
1592 struct lu_fid *fid2 = &info->eti_fid2;
1593 struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
1598 rc = mdo_lookup(env, parent, lname, fid2, spec);
1601 else if (rc != -ENOENT)
1604 ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1606 if (IS_ERR(ec_child)) {
1607 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1609 RETURN(PTR_ERR(ec_child));
1612 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1614 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1615 GOTO(out_put, rc = -EINVAL);
1618 CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1619 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1622 * Do not perform lookup sanity check. We know that name does not exist.
1624 spec->sp_cr_lookup = 0;
1625 rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1627 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1630 CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc = %d\n",
1631 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1634 lu_object_put(env, ec_child);
1638 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1641 struct echo_thread_info *info = echo_env_info(env);
1643 if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1644 ma->ma_lmm = (void *)&info->eti_lmm;
1645 ma->ma_lmm_size = sizeof(info->eti_lmm);
1647 LASSERT(info->eti_big_lmmsize);
1648 ma->ma_lmm = info->eti_big_lmm;
1649 ma->ma_lmm_size = info->eti_big_lmmsize;
1656 echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
1657 struct lu_object *obj, const char *name,
1658 unsigned int namelen, __u64 id,
1659 struct lu_object **new_parent)
1661 struct echo_thread_info *info = echo_env_info(env);
1662 struct md_attr *ma = &info->eti_ma;
1663 struct lmv_mds_md_v1 *lmv;
1664 struct lu_device *ld = ed->ed_next;
1666 struct lu_name tmp_ln_name;
1667 struct lu_fid stripe_fid;
1668 struct lu_object *stripe_obj;
1671 LASSERT(obj != NULL);
1672 LASSERT(S_ISDIR(obj->lo_header->loh_attr));
1674 memset(ma, 0, sizeof(*ma));
1675 echo_set_lmm_size(env, ld, ma);
1676 ma->ma_need = MA_LMV;
1677 rc = echo_attr_get_complex(env, lu2md(obj), ma);
1679 CERROR("Can not getattr child "DFID": rc = %d\n",
1680 PFID(lu_object_fid(obj)), rc);
1684 if (!(ma->ma_valid & MA_LMV)) {
1689 lmv = (struct lmv_mds_md_v1 *)ma->ma_lmm;
1690 if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) {
1692 CERROR("Invalid mds md magic %x "DFID": rc = %d\n",
1693 le32_to_cpu(lmv->lmv_magic), PFID(lu_object_fid(obj)),
1699 tmp_ln_name.ln_name = name;
1700 tmp_ln_name.ln_namelen = namelen;
1703 echo_md_build_name(&tmp_ln_name, info->eti_name, id);
1706 idx = lmv_name_to_stripe_index(LMV_HASH_TYPE_FNV_1A_64,
1707 le32_to_cpu(lmv->lmv_stripe_count),
1708 tmp_ln_name.ln_name, tmp_ln_name.ln_namelen);
1710 LASSERT(idx < le32_to_cpu(lmv->lmv_stripe_count));
1711 fid_le_to_cpu(&stripe_fid, &lmv->lmv_stripe_fids[idx]);
1713 stripe_obj = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, &stripe_fid,
1715 if (IS_ERR(stripe_obj)) {
1716 rc = PTR_ERR(stripe_obj);
1717 CERROR("Can not find the parent "DFID": rc = %d\n",
1718 PFID(&stripe_fid), rc);
1722 *new_parent = lu_object_locate(stripe_obj->lo_header, ld->ld_type);
1724 lu_object_put(env, stripe_obj);
1731 static int echo_create_md_object(const struct lu_env *env,
1732 struct echo_device *ed,
1733 struct lu_object *ec_parent,
1735 char *name, int namelen,
1736 __u64 id, __u32 mode, int count,
1737 int stripe_count, int stripe_offset)
1739 struct lu_object *parent;
1740 struct lu_object *new_parent;
1741 struct echo_thread_info *info = echo_env_info(env);
1742 struct lu_name *lname = &info->eti_lname;
1743 struct md_op_spec *spec = &info->eti_spec;
1744 struct md_attr *ma = &info->eti_ma;
1745 struct lu_device *ld = ed->ed_next;
1753 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1757 rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
1762 LASSERT(new_parent != NULL);
1763 memset(ma, 0, sizeof(*ma));
1764 memset(spec, 0, sizeof(*spec));
1765 echo_set_lmm_size(env, ld, ma);
1766 if (stripe_count != 0) {
1767 spec->sp_cr_flags |= MDS_FMODE_WRITE;
1768 if (stripe_count != -1) {
1769 if (S_ISDIR(mode)) {
1770 struct lmv_user_md *lmu;
1772 lmu = (struct lmv_user_md *)&info->eti_lum;
1773 lmu->lum_magic = LMV_USER_MAGIC;
1774 lmu->lum_stripe_offset = stripe_offset;
1775 lmu->lum_stripe_count = stripe_count;
1776 lmu->lum_hash_type = LMV_HASH_TYPE_FNV_1A_64;
1777 spec->u.sp_ea.eadata = lmu;
1778 spec->u.sp_ea.eadatalen = sizeof(*lmu);
1780 struct lov_user_md_v3 *lum = &info->eti_lum;
1782 lum->lmm_magic = LOV_USER_MAGIC_V3;
1783 lum->lmm_stripe_count = stripe_count;
1784 lum->lmm_stripe_offset = stripe_offset;
1785 lum->lmm_pattern = LOV_PATTERN_NONE;
1786 spec->u.sp_ea.eadata = lum;
1787 spec->u.sp_ea.eadatalen = sizeof(*lum);
1789 spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1793 ma->ma_attr.la_mode = mode;
1794 ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1795 ma->ma_attr.la_ctime = ktime_get_real_seconds();
1798 lname->ln_name = name;
1799 lname->ln_namelen = namelen;
1800 /* If name is specified, only create one object by name */
1801 rc = echo_md_create_internal(env, ed, lu2md(new_parent), fid,
1806 /* Create multiple object sequenced by id */
1807 for (i = 0; i < count; i++) {
1808 char *tmp_name = info->eti_name;
1810 echo_md_build_name(lname, tmp_name, id);
1812 rc = echo_md_create_internal(env, ed, lu2md(new_parent),
1813 fid, lname, spec, ma);
1815 CERROR("Can not create child %s: rc = %d\n", tmp_name,
1824 if (new_parent != parent)
1825 lu_object_put(env, new_parent);
1830 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1831 struct echo_device *ed,
1832 struct md_object *parent,
1833 struct lu_name *lname)
1835 struct echo_thread_info *info = echo_env_info(env);
1836 struct lu_fid *fid = &info->eti_fid;
1837 struct lu_object *child;
1841 CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1844 rc = mdo_lookup(env, parent, lname, fid, NULL);
1846 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1847 RETURN(ERR_PTR(rc));
1851 * In the function below, .hs_keycmp resolves to
1852 * lu_obj_hop_keycmp()
1854 /* coverity[overrun-buffer-val] */
1855 child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1860 static int echo_setattr_object(const struct lu_env *env,
1861 struct echo_device *ed,
1862 struct lu_object *ec_parent,
1863 __u64 id, int count)
1865 struct lu_object *parent;
1866 struct lu_object *new_parent;
1867 struct echo_thread_info *info = echo_env_info(env);
1868 struct lu_name *lname = &info->eti_lname;
1869 char *name = info->eti_name;
1870 struct lu_device *ld = ed->ed_next;
1871 struct lu_buf *buf = &info->eti_buf;
1879 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1883 rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
1888 for (i = 0; i < count; i++) {
1889 struct lu_object *ec_child, *child;
1891 echo_md_build_name(lname, name, id);
1893 ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
1894 if (IS_ERR(ec_child)) {
1895 rc = PTR_ERR(ec_child);
1896 CERROR("Can't find child %s: rc = %d\n",
1897 lname->ln_name, rc);
1901 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1903 CERROR("Can not locate the child %s\n", lname->ln_name);
1904 lu_object_put(env, ec_child);
1909 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1910 PFID(lu_object_fid(child)));
1912 buf->lb_buf = info->eti_xattr_buf;
1913 buf->lb_len = sizeof(info->eti_xattr_buf);
1915 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1916 rc = mo_xattr_set(env, lu2md(child), buf, name,
1919 CERROR("Can not setattr child "DFID": rc = %d\n",
1920 PFID(lu_object_fid(child)), rc);
1921 lu_object_put(env, ec_child);
1924 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1925 PFID(lu_object_fid(child)));
1927 lu_object_put(env, ec_child);
1930 if (new_parent != parent)
1931 lu_object_put(env, new_parent);
1936 static int echo_getattr_object(const struct lu_env *env,
1937 struct echo_device *ed,
1938 struct lu_object *ec_parent,
1939 __u64 id, int count)
1941 struct lu_object *parent;
1942 struct lu_object *new_parent;
1943 struct echo_thread_info *info = echo_env_info(env);
1944 struct lu_name *lname = &info->eti_lname;
1945 char *name = info->eti_name;
1946 struct md_attr *ma = &info->eti_ma;
1947 struct lu_device *ld = ed->ed_next;
1955 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1959 rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
1964 memset(ma, 0, sizeof(*ma));
1965 ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1966 ma->ma_acl = info->eti_xattr_buf;
1967 ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1969 for (i = 0; i < count; i++) {
1970 struct lu_object *ec_child, *child;
1973 echo_md_build_name(lname, name, id);
1974 echo_set_lmm_size(env, ld, ma);
1976 ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
1977 if (IS_ERR(ec_child)) {
1978 CERROR("Can't find child %s: rc = %ld\n",
1979 lname->ln_name, PTR_ERR(ec_child));
1980 RETURN(PTR_ERR(ec_child));
1983 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1985 CERROR("Can not locate the child %s\n", lname->ln_name);
1986 lu_object_put(env, ec_child);
1990 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1991 PFID(lu_object_fid(child)));
1992 rc = echo_attr_get_complex(env, lu2md(child), ma);
1994 CERROR("Can not getattr child "DFID": rc = %d\n",
1995 PFID(lu_object_fid(child)), rc);
1996 lu_object_put(env, ec_child);
1999 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
2000 PFID(lu_object_fid(child)));
2002 lu_object_put(env, ec_child);
2005 if (new_parent != parent)
2006 lu_object_put(env, new_parent);
2011 static int echo_lookup_object(const struct lu_env *env,
2012 struct echo_device *ed,
2013 struct lu_object *ec_parent,
2014 __u64 id, int count)
2016 struct lu_object *parent;
2017 struct lu_object *new_parent;
2018 struct echo_thread_info *info = echo_env_info(env);
2019 struct lu_name *lname = &info->eti_lname;
2020 char *name = info->eti_name;
2021 struct lu_fid *fid = &info->eti_fid;
2022 struct lu_device *ld = ed->ed_next;
2028 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
2032 rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
2037 /*prepare the requests*/
2038 for (i = 0; i < count; i++) {
2039 echo_md_build_name(lname, name, id);
2041 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
2042 PFID(lu_object_fid(new_parent)), lname->ln_name,
2045 rc = mdo_lookup(env, lu2md(new_parent), lname, fid, NULL);
2047 CERROR("Can not lookup child %s: rc = %d\n", name, rc);
2051 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
2052 PFID(lu_object_fid(new_parent)), lname->ln_name,
2058 if (new_parent != parent)
2059 lu_object_put(env, new_parent);
2064 static int echo_md_destroy_internal(const struct lu_env *env,
2065 struct echo_device *ed,
2066 struct md_object *parent,
2067 struct lu_name *lname,
2070 struct lu_device *ld = ed->ed_next;
2071 struct lu_object *ec_child;
2072 struct lu_object *child;
2077 ec_child = echo_md_lookup(env, ed, parent, lname);
2078 if (IS_ERR(ec_child)) {
2079 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
2081 RETURN(PTR_ERR(ec_child));
2084 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
2086 CERROR("Can not locate the child %s\n", lname->ln_name);
2087 GOTO(out_put, rc = -EINVAL);
2090 if (lu_object_remote(child)) {
2091 CERROR("Can not destroy remote object %s: rc = %d\n",
2092 lname->ln_name, -EPERM);
2093 GOTO(out_put, rc = -EPERM);
2095 CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
2096 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
2098 rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
2100 CERROR("Can not unlink child %s: rc = %d\n",
2101 lname->ln_name, rc);
2104 CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
2105 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
2107 lu_object_put(env, ec_child);
2111 static int echo_destroy_object(const struct lu_env *env,
2112 struct echo_device *ed,
2113 struct lu_object *ec_parent,
2114 char *name, int namelen,
2115 __u64 id, __u32 mode,
2118 struct echo_thread_info *info = echo_env_info(env);
2119 struct lu_name *lname = &info->eti_lname;
2120 struct md_attr *ma = &info->eti_ma;
2121 struct lu_device *ld = ed->ed_next;
2122 struct lu_object *parent;
2123 struct lu_object *new_parent;
2128 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
2132 rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
2137 memset(ma, 0, sizeof(*ma));
2138 ma->ma_attr.la_mode = mode;
2139 ma->ma_attr.la_valid = LA_CTIME;
2140 ma->ma_attr.la_ctime = ktime_get_real_seconds();
2141 ma->ma_need = MA_INODE;
2145 lname->ln_name = name;
2146 lname->ln_namelen = namelen;
2147 rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
2152 /*prepare the requests*/
2153 for (i = 0; i < count; i++) {
2154 char *tmp_name = info->eti_name;
2157 echo_md_build_name(lname, tmp_name, id);
2159 rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
2162 CERROR("Can not unlink child %s: rc = %d\n", name, rc);
2169 if (new_parent != parent)
2170 lu_object_put(env, new_parent);
2175 static struct lu_object *echo_resolve_path(const struct lu_env *env,
2176 struct echo_device *ed, char *path,
2179 struct lu_device *ld = ed->ed_next;
2180 struct echo_thread_info *info = echo_env_info(env);
2181 struct lu_fid *fid = &info->eti_fid;
2182 struct lu_name *lname = &info->eti_lname;
2183 struct lu_object *parent = NULL;
2184 struct lu_object *child = NULL;
2188 *fid = ed->ed_root_fid;
2191 * In the function below, .hs_keycmp resolves to
2192 * lu_obj_hop_keycmp()
2194 /* coverity[overrun-buffer-val] */
2195 parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
2196 if (IS_ERR(parent)) {
2197 CERROR("Can not find the parent "DFID": rc = %ld\n",
2198 PFID(fid), PTR_ERR(parent));
2203 struct lu_object *ld_parent;
2206 e = strsep(&path, "/");
2211 if (!path || path[0] == '\0')
2217 lname->ln_namelen = strlen(e);
2219 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
2221 lu_object_put(env, parent);
2226 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
2227 lu_object_put(env, parent);
2228 if (IS_ERR(child)) {
2229 rc = (int)PTR_ERR(child);
2230 CERROR("lookup %s under parent "DFID": rc = %d\n",
2231 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2238 RETURN(ERR_PTR(rc));
2243 static void echo_ucred_init(struct lu_env *env)
2245 struct lu_ucred *ucred = lu_ucred(env);
2247 ucred->uc_valid = UCRED_INVALID;
2249 ucred->uc_suppgids[0] = -1;
2250 ucred->uc_suppgids[1] = -1;
2252 ucred->uc_uid = ucred->uc_o_uid =
2253 from_kuid(&init_user_ns, current_uid());
2254 ucred->uc_gid = ucred->uc_o_gid =
2255 from_kgid(&init_user_ns, current_gid());
2256 ucred->uc_fsuid = ucred->uc_o_fsuid =
2257 from_kuid(&init_user_ns, current_fsuid());
2258 ucred->uc_fsgid = ucred->uc_o_fsgid =
2259 from_kgid(&init_user_ns, current_fsgid());
2260 ucred->uc_cap = cfs_curproc_cap_pack();
2262 /* remove fs privilege for non-root user. */
2263 if (ucred->uc_fsuid)
2264 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2265 ucred->uc_valid = UCRED_NEW;
2268 static void echo_ucred_fini(struct lu_env *env)
2270 struct lu_ucred *ucred = lu_ucred(env);
2272 ucred->uc_valid = UCRED_INIT;
2275 static int echo_md_handler(struct echo_device *ed, int command,
2276 char *path, int path_len, __u64 id, int count,
2277 struct obd_ioctl_data *data)
2279 struct echo_thread_info *info;
2280 struct lu_device *ld = ed->ed_next;
2283 struct lu_object *parent;
2285 int namelen = data->ioc_plen2;
2290 CERROR("MD echo client is not being initialized properly\n");
2294 if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2295 CERROR("Only support MDD layer right now!\n");
2299 env = cl_env_get(&refcheck);
2301 RETURN(PTR_ERR(env));
2303 rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_SES_TAG);
2307 /* init big_lmm buffer */
2308 info = echo_env_info(env);
2309 LASSERT(info->eti_big_lmm == NULL);
2310 OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2311 if (!info->eti_big_lmm)
2312 GOTO(out_env, rc = -ENOMEM);
2313 info->eti_big_lmmsize = MIN_MD_SIZE;
2315 parent = echo_resolve_path(env, ed, path, path_len);
2316 if (IS_ERR(parent)) {
2317 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2319 GOTO(out_free, rc = PTR_ERR(parent));
2323 OBD_ALLOC(name, namelen + 1);
2325 GOTO(out_put, rc = -ENOMEM);
2326 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2327 GOTO(out_name, rc = -EFAULT);
2330 echo_ucred_init(env);
2333 case ECHO_MD_CREATE:
2334 case ECHO_MD_MKDIR: {
2335 struct echo_thread_info *info = echo_env_info(env);
2336 __u32 mode = data->ioc_obdo2.o_mode;
2337 struct lu_fid *fid = &info->eti_fid;
2338 int stripe_count = (int)data->ioc_obdo2.o_misc;
2339 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2341 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2346 * In the function below, .hs_keycmp resolves to
2347 * lu_obj_hop_keycmp()
2349 /* coverity[overrun-buffer-val] */
2350 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2351 id, mode, count, stripe_count,
2355 case ECHO_MD_DESTROY:
2356 case ECHO_MD_RMDIR: {
2357 __u32 mode = data->ioc_obdo2.o_mode;
2359 rc = echo_destroy_object(env, ed, parent, name, namelen,
2363 case ECHO_MD_LOOKUP:
2364 rc = echo_lookup_object(env, ed, parent, id, count);
2366 case ECHO_MD_GETATTR:
2367 rc = echo_getattr_object(env, ed, parent, id, count);
2369 case ECHO_MD_SETATTR:
2370 rc = echo_setattr_object(env, ed, parent, id, count);
2373 CERROR("unknown command %d\n", command);
2377 echo_ucred_fini(env);
2381 OBD_FREE(name, namelen + 1);
2383 lu_object_put(env, parent);
2385 LASSERT(info->eti_big_lmm);
2386 OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2387 info->eti_big_lmm = NULL;
2388 info->eti_big_lmmsize = 0;
2390 cl_env_put(env, &refcheck);
2393 #endif /* HAVE_SERVER_SUPPORT */
2395 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2398 struct echo_object *eco;
2399 struct echo_client_obd *ec = ed->ed_ec;
2404 if (!(oa->o_valid & OBD_MD_FLID) ||
2405 !(oa->o_valid & OBD_MD_FLGROUP) ||
2406 !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
2407 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2411 if (ostid_id(&oa->o_oi) == 0) {
2412 rc = ostid_set_id(&oa->o_oi, ++last_object_id);
2417 rc = obd_create(env, ec->ec_exp, oa);
2419 CERROR("Cannot create objects: rc = %d\n", rc);
2425 oa->o_valid |= OBD_MD_FLID;
2427 eco = cl_echo_object_find(ed, &oa->o_oi);
2429 GOTO(failed, rc = PTR_ERR(eco));
2430 cl_echo_object_put(eco);
2432 CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2436 if (created && rc != 0)
2437 obd_destroy(env, ec->ec_exp, oa);
2440 CERROR("create object failed with: rc = %d\n", rc);
2445 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2448 struct echo_object *eco;
2452 if (!(oa->o_valid & OBD_MD_FLID) ||
2453 !(oa->o_valid & OBD_MD_FLGROUP) ||
2454 ostid_id(&oa->o_oi) == 0) {
2455 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2460 eco = cl_echo_object_find(ed, &oa->o_oi);
2469 static void echo_put_object(struct echo_object *eco)
2473 rc = cl_echo_object_put(eco);
2475 CERROR("%s: echo client drop an object failed: rc = %d\n",
2476 eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2479 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2480 u64 offset, u64 count)
2487 /* no partial pages on the client */
2488 LASSERT(count == PAGE_SIZE);
2492 for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2493 if (rw == OBD_BRW_WRITE) {
2494 stripe_off = offset + delta;
2497 stripe_off = 0xdeadbeef00c0ffeeULL;
2498 stripe_id = 0xdeadbeef00c0ffeeULL;
2500 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2501 stripe_off, stripe_id);
2508 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2517 /* no partial pages on the client */
2518 LASSERT(count == PAGE_SIZE);
2522 for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2523 stripe_off = offset + delta;
2526 rc2 = block_debug_check("test_brw",
2527 addr + delta, OBD_ECHO_BLOCK_SIZE,
2528 stripe_off, stripe_id);
2530 CERROR("Error in echo object %#llx\n", id);
2539 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2540 struct echo_object *eco, u64 offset,
2541 u64 count, int async)
2544 struct brw_page *pga;
2545 struct brw_page *pgp;
2546 struct page **pages;
2555 verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2556 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2557 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2559 gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
2561 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2563 if ((count & (~PAGE_MASK)) != 0)
2566 /* XXX think again with misaligned I/O */
2567 npages = count >> PAGE_SHIFT;
2569 if (rw == OBD_BRW_WRITE)
2570 brw_flags = OBD_BRW_ASYNC;
2572 OBD_ALLOC(pga, npages * sizeof(*pga));
2576 OBD_ALLOC(pages, npages * sizeof(*pages));
2578 OBD_FREE(pga, npages * sizeof(*pga));
2582 for (i = 0, pgp = pga, off = offset;
2584 i++, pgp++, off += PAGE_SIZE) {
2586 LASSERT(pgp->pg == NULL); /* for cleanup */
2589 pgp->pg = alloc_page(gfp_mask);
2594 pgp->count = PAGE_SIZE;
2596 pgp->flag = brw_flags;
2599 echo_client_page_debug_setup(pgp->pg, rw,
2600 ostid_id(&oa->o_oi), off,
2604 /* brw mode can only be used at client */
2605 LASSERT(ed->ed_next != NULL);
2606 rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2609 if (rc != 0 || rw != OBD_BRW_READ)
2612 for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2619 vrc = echo_client_page_debug_check(pgp->pg,
2620 ostid_id(&oa->o_oi),
2623 if (vrc != 0 && rc == 0)
2626 __free_page(pgp->pg);
2628 OBD_FREE(pga, npages * sizeof(*pga));
2629 OBD_FREE(pages, npages * sizeof(*pages));
2633 static int echo_client_prep_commit(const struct lu_env *env,
2634 struct obd_export *exp, int rw,
2635 struct obdo *oa, struct echo_object *eco,
2636 u64 offset, u64 count,
2637 u64 batch, int async)
2639 struct obd_ioobj ioo;
2640 struct niobuf_local *lnb;
2641 struct niobuf_remote rnb;
2643 u64 npages, tot_pages, apc;
2644 int i, ret = 0, brw_flags = 0;
2647 if (count <= 0 || (count & ~PAGE_MASK) != 0)
2650 apc = npages = batch >> PAGE_SHIFT;
2651 tot_pages = count >> PAGE_SHIFT;
2653 OBD_ALLOC_LARGE(lnb, apc * sizeof(struct niobuf_local));
2657 if (rw == OBD_BRW_WRITE && async)
2658 brw_flags |= OBD_BRW_ASYNC;
2660 obdo_to_ioobj(oa, &ioo);
2664 for (; tot_pages > 0; tot_pages -= npages) {
2667 if (tot_pages < npages)
2670 rnb.rnb_offset = off;
2671 rnb.rnb_len = npages * PAGE_SIZE;
2672 rnb.rnb_flags = brw_flags;
2674 off += npages * PAGE_SIZE;
2677 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
2681 for (i = 0; i < lpages; i++) {
2682 struct page *page = lnb[i].lnb_page;
2684 /* read past eof? */
2685 if (!page && lnb[i].lnb_rc == 0)
2689 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2691 if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2692 (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2693 (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2696 if (rw == OBD_BRW_WRITE)
2697 echo_client_page_debug_setup(page, rw,
2698 ostid_id(&oa->o_oi),
2699 lnb[i].lnb_file_offset,
2702 echo_client_page_debug_check(page,
2703 ostid_id(&oa->o_oi),
2704 lnb[i].lnb_file_offset,
2708 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
2713 /* Reuse env context. */
2714 lu_context_exit((struct lu_context *)&env->le_ctx);
2715 lu_context_enter((struct lu_context *)&env->le_ctx);
2719 OBD_FREE_LARGE(lnb, apc * sizeof(struct niobuf_local));
2724 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2725 struct obd_export *exp,
2726 struct obd_ioctl_data *data)
2728 struct obd_device *obd = class_exp2obd(exp);
2729 struct echo_device *ed = obd2echo_dev(obd);
2730 struct echo_client_obd *ec = ed->ed_ec;
2731 struct obdo *oa = &data->ioc_obdo1;
2732 struct echo_object *eco;
2738 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2740 rc = echo_get_object(&eco, ed, oa);
2744 oa->o_valid &= ~OBD_MD_FLHANDLE;
2746 /* OFD/obdfilter works only via prep/commit */
2747 test_mode = (long)data->ioc_pbuf1;
2748 if (!ed->ed_next && test_mode != 3) {
2750 data->ioc_plen1 = data->ioc_count;
2756 /* Truncate batch size to maximum */
2757 if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2758 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2760 switch (test_mode) {
2764 rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
2765 data->ioc_count, async);
2768 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
2769 data->ioc_offset, data->ioc_count,
2770 data->ioc_plen1, async);
2776 echo_put_object(eco);
2782 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2783 void *karg, void __user *uarg)
2785 #ifdef HAVE_SERVER_SUPPORT
2786 struct tgt_session_info *tsi;
2788 struct obd_device *obd = exp->exp_obd;
2789 struct echo_device *ed = obd2echo_dev(obd);
2790 struct echo_client_obd *ec = ed->ed_ec;
2791 struct echo_object *eco;
2792 struct obd_ioctl_data *data = karg;
2794 unsigned long env_tags = 0;
2798 int rw = OBD_BRW_READ;
2802 oa = &data->ioc_obdo1;
2803 if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2804 oa->o_valid |= OBD_MD_FLGROUP;
2805 ostid_set_seq_echo(&oa->o_oi);
2808 /* This FID is unpacked just for validation at this point */
2809 rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2813 env = cl_env_get(&refcheck);
2815 RETURN(PTR_ERR(env));
2819 #ifdef HAVE_SERVER_SUPPORT
2820 if (cmd == OBD_IOC_ECHO_MD || cmd == OBD_IOC_ECHO_ALLOC_SEQ)
2821 env_tags = ECHO_MD_CTX_TAG;
2824 env_tags = ECHO_DT_CTX_TAG;
2826 rc = lu_env_refill_by_tags(env, env_tags, ECHO_SES_TAG);
2830 #ifdef HAVE_SERVER_SUPPORT
2831 tsi = tgt_ses_info(env);
2832 /* treat as local operation */
2833 tsi->tsi_exp = NULL;
2834 tsi->tsi_jobid = NULL;
2838 case OBD_IOC_CREATE: /* may create echo object */
2839 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2840 GOTO(out, rc = -EPERM);
2842 rc = echo_create_object(env, ed, oa);
2845 #ifdef HAVE_SERVER_SUPPORT
2846 case OBD_IOC_ECHO_MD: {
2853 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2854 GOTO(out, rc = -EPERM);
2856 count = data->ioc_count;
2857 cmd = data->ioc_command;
2859 id = data->ioc_obdo2.o_oi.oi.oi_id;
2860 dirlen = data->ioc_plen1;
2861 OBD_ALLOC(dir, dirlen + 1);
2863 GOTO(out, rc = -ENOMEM);
2865 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2866 OBD_FREE(dir, data->ioc_plen1 + 1);
2867 GOTO(out, rc = -EFAULT);
2870 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2871 OBD_FREE(dir, dirlen + 1);
2874 case OBD_IOC_ECHO_ALLOC_SEQ: {
2878 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2879 GOTO(out, rc = -EPERM);
2881 rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
2883 CERROR("%s: Can not alloc seq: rc = %d\n",
2888 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2891 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2892 if (copy_to_user(data->ioc_pbuf2, &max_count,
2897 #endif /* HAVE_SERVER_SUPPORT */
2898 case OBD_IOC_DESTROY:
2899 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2900 GOTO(out, rc = -EPERM);
2902 rc = echo_get_object(&eco, ed, oa);
2904 rc = obd_destroy(env, ec->ec_exp, oa);
2906 eco->eo_deleted = 1;
2907 echo_put_object(eco);
2911 case OBD_IOC_GETATTR:
2912 rc = echo_get_object(&eco, ed, oa);
2914 rc = obd_getattr(env, ec->ec_exp, oa);
2915 echo_put_object(eco);
2919 case OBD_IOC_SETATTR:
2920 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2921 GOTO(out, rc = -EPERM);
2923 rc = echo_get_object(&eco, ed, oa);
2925 rc = obd_setattr(env, ec->ec_exp, oa);
2926 echo_put_object(eco);
2930 case OBD_IOC_BRW_WRITE:
2931 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2932 GOTO(out, rc = -EPERM);
2936 case OBD_IOC_BRW_READ:
2937 rc = echo_client_brw_ioctl(env, rw, exp, data);
2941 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2942 GOTO(out, rc = -ENOTTY);
2948 cl_env_put(env, &refcheck);
2953 static int echo_client_setup(const struct lu_env *env,
2954 struct obd_device *obddev, struct lustre_cfg *lcfg)
2956 struct echo_client_obd *ec = &obddev->u.echo_client;
2957 struct obd_device *tgt;
2958 struct obd_uuid echo_uuid = { "ECHO_UUID" };
2959 struct obd_connect_data *ocd = NULL;
2963 if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2964 CERROR("requires a TARGET OBD name\n");
2968 tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2969 if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2970 CERROR("device not attached or not set up (%s)\n",
2971 lustre_cfg_string(lcfg, 1));
2975 spin_lock_init(&ec->ec_lock);
2976 INIT_LIST_HEAD(&ec->ec_objects);
2977 INIT_LIST_HEAD(&ec->ec_locks);
2980 lu_context_tags_update(ECHO_DT_CTX_TAG);
2981 lu_session_tags_update(ECHO_SES_TAG);
2983 if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2984 #ifdef HAVE_SERVER_SUPPORT
2985 lu_context_tags_update(ECHO_MD_CTX_TAG);
2988 "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
2993 OBD_ALLOC(ocd, sizeof(*ocd));
2995 CERROR("Can't alloc ocd connecting to %s\n",
2996 lustre_cfg_string(lcfg, 1));
3000 ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
3001 OBD_CONNECT_BRW_SIZE |
3002 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
3003 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
3005 ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
3006 ocd->ocd_version = LUSTRE_VERSION_CODE;
3007 ocd->ocd_group = FID_SEQ_ECHO;
3009 rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
3011 /* Turn off pinger because it connects to tgt obd directly. */
3012 spin_lock(&tgt->obd_dev_lock);
3013 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
3014 spin_unlock(&tgt->obd_dev_lock);
3017 OBD_FREE(ocd, sizeof(*ocd));
3020 CERROR("fail to connect to device %s\n",
3021 lustre_cfg_string(lcfg, 1));
3028 static int echo_client_cleanup(struct obd_device *obddev)
3030 struct echo_device *ed = obd2echo_dev(obddev);
3031 struct echo_client_obd *ec = &obddev->u.echo_client;
3035 /*Do nothing for Metadata echo client*/
3039 lu_session_tags_clear(ECHO_SES_TAG & ~LCT_SESSION);
3040 lu_context_tags_clear(ECHO_DT_CTX_TAG);
3041 if (ed->ed_next_ismd) {
3042 #ifdef HAVE_SERVER_SUPPORT
3043 lu_context_tags_clear(ECHO_MD_CTX_TAG);
3046 "This is client-side only module, does not support metadata echo client.\n");
3051 if (!list_empty(&obddev->obd_exports)) {
3052 CERROR("still has clients!\n");
3056 LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
3057 rc = obd_disconnect(ec->ec_exp);
3059 CERROR("fail to disconnect device: %d\n", rc);
3064 static int echo_client_connect(const struct lu_env *env,
3065 struct obd_export **exp,
3066 struct obd_device *src, struct obd_uuid *cluuid,
3067 struct obd_connect_data *data, void *localdata)
3070 struct lustre_handle conn = { 0 };
3073 rc = class_connect(&conn, src, cluuid);
3075 *exp = class_conn2export(&conn);
3080 static int echo_client_disconnect(struct obd_export *exp)
3086 GOTO(out, rc = -EINVAL);
3088 rc = class_disconnect(exp);
3094 static struct obd_ops echo_client_obd_ops = {
3095 .o_owner = THIS_MODULE,
3096 .o_iocontrol = echo_client_iocontrol,
3097 .o_connect = echo_client_connect,
3098 .o_disconnect = echo_client_disconnect
3101 static int __init obdecho_init(void)
3106 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
3108 LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
3110 # ifdef HAVE_SERVER_SUPPORT
3111 rc = echo_persistent_pages_init();
3115 rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
3116 LUSTRE_ECHO_NAME, &echo_srv_type);
3121 rc = lu_kmem_init(echo_caches);
3123 rc = class_register_type(&echo_client_obd_ops, NULL, false,
3124 NULL, LUSTRE_ECHO_CLIENT_NAME,
3127 lu_kmem_fini(echo_caches);
3130 # ifdef HAVE_SERVER_SUPPORT
3134 class_unregister_type(LUSTRE_ECHO_NAME);
3136 echo_persistent_pages_fini();
3142 static void __exit obdecho_exit(void)
3144 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
3145 lu_kmem_fini(echo_caches);
3147 #ifdef HAVE_SERVER_SUPPORT
3148 class_unregister_type(LUSTRE_ECHO_NAME);
3149 echo_persistent_pages_fini();
3153 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3154 MODULE_DESCRIPTION("Lustre Echo Client test driver");
3155 MODULE_VERSION(LUSTRE_VERSION_STRING);
3156 MODULE_LICENSE("GPL");
3158 module_init(obdecho_init);
3159 module_exit(obdecho_exit);
3161 /** @} echo_client */