4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
32 #define DEBUG_SUBSYSTEM S_ECHO
34 #include <linux/user_namespace.h>
35 #include <linux/uidgid.h>
37 #include <libcfs/libcfs.h>
39 #include <obd_support.h>
40 #include <obd_class.h>
41 #include <lprocfs_status.h>
42 #include <cl_object.h>
43 #include <lustre_fid.h>
44 #include <lustre_lmv.h>
45 #include <lustre_acl.h>
46 #include <uapi/linux/lustre/lustre_ioctl.h>
47 #include <lustre_net.h>
48 #ifdef HAVE_SERVER_SUPPORT
49 # include <md_object.h>
51 #define ETI_NAME_LEN 20
53 #endif /* HAVE_SERVER_SUPPORT */
55 #include "echo_internal.h"
57 /** \defgroup echo_client Echo Client
61 /* echo thread key have a CL_THREAD flag, which set cl_env function directly */
62 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
63 #define ECHO_DT_CTX_TAG (LCT_REMEMBER | LCT_DT_THREAD)
64 #define ECHO_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
67 struct cl_device ed_cl;
68 struct echo_client_obd *ed_ec;
70 struct cl_site ed_site_myself;
71 struct lu_site *ed_site;
72 struct lu_device *ed_next;
74 struct lu_client_seq *ed_cl_seq;
75 #ifdef HAVE_SERVER_SUPPORT
76 struct local_oid_storage *ed_los;
77 struct lu_fid ed_root_fid;
78 #endif /* HAVE_SERVER_SUPPORT */
82 struct cl_object eo_cl;
83 struct cl_object_header eo_hdr;
84 struct echo_device *eo_dev;
85 struct list_head eo_obj_chain;
86 struct lov_oinfo *eo_oinfo;
90 struct echo_object_conf {
91 struct cl_object_conf eoc_cl;
92 struct lov_oinfo **eoc_oinfo;
95 #ifdef HAVE_SERVER_SUPPORT
96 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
99 * In order to use the values of members in struct mdd_device,
100 * we define an alias structure here.
102 struct echo_md_device {
103 struct md_device emd_md_dev;
104 struct obd_export *emd_child_exp;
105 struct dt_device *emd_child;
106 struct dt_device *emd_bottom;
107 struct lu_fid emd_root_fid;
108 struct lu_fid emd_local_root_fid;
110 #endif /* HAVE_SERVER_SUPPORT */
112 static int echo_client_setup(const struct lu_env *env,
113 struct obd_device *obd,
114 struct lustre_cfg *lcfg);
115 static int echo_client_cleanup(struct obd_device *obd);
117 /** \defgroup echo_helpers Helper functions
120 static struct echo_device *cl2echo_dev(const struct cl_device *dev)
122 return container_of_safe(dev, struct echo_device, ed_cl);
125 static struct cl_device *echo_dev2cl(struct echo_device *d)
130 static struct echo_device *obd2echo_dev(const struct obd_device *obd)
132 return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
135 static struct cl_object *echo_obj2cl(struct echo_object *eco)
140 static struct echo_object *cl2echo_obj(const struct cl_object *o)
142 return container_of(o, struct echo_object, eo_cl);
145 static struct lu_context_key echo_thread_key;
147 static struct echo_thread_info *echo_env_info(const struct lu_env *env)
149 struct echo_thread_info *info;
151 info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
152 LASSERT(info != NULL);
156 static struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
158 return container_of(c, struct echo_object_conf, eoc_cl);
161 #ifdef HAVE_SERVER_SUPPORT
162 static struct echo_md_device *lu2emd_dev(struct lu_device *d)
164 return container_of_safe(d, struct echo_md_device,
165 emd_md_dev.md_lu_dev);
168 static struct lu_device *emd2lu_dev(struct echo_md_device *d)
170 return &d->emd_md_dev.md_lu_dev;
173 static struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
175 return emd2lu_dev(d)->ld_site->ld_seq_site;
178 static struct obd_device *emd2obd_dev(struct echo_md_device *d)
180 return d->emd_md_dev.md_lu_dev.ld_obd;
182 #endif /* HAVE_SERVER_SUPPORT */
184 /** @} echo_helpers */
186 static int cl_echo_object_put(struct echo_object *eco);
188 struct echo_thread_info {
189 struct echo_object_conf eti_conf;
190 struct lustre_md eti_md;
191 struct lu_fid eti_fid;
192 struct lu_fid eti_fid2;
193 #ifdef HAVE_SERVER_SUPPORT
194 struct md_op_spec eti_spec;
195 struct lov_mds_md_v3 eti_lmm;
196 struct lov_user_md_v3 eti_lum;
197 struct md_attr eti_ma;
198 struct lu_name eti_lname;
199 /* per-thread values, can be re-used */
200 void *eti_big_lmm; /* may be vmalloc'd */
202 char eti_name[ETI_NAME_LEN];
203 struct lu_buf eti_buf;
204 /* If we want to test large ACL, then need to enlarge the buffer. */
205 char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE_OLD];
209 /* No session used right now */
210 struct echo_session_info {
214 static struct kmem_cache *echo_object_kmem;
215 static struct kmem_cache *echo_thread_kmem;
216 static struct kmem_cache *echo_session_kmem;
218 static struct lu_kmem_descr echo_caches[] = {
220 .ckd_cache = &echo_object_kmem,
221 .ckd_name = "echo_object_kmem",
222 .ckd_size = sizeof(struct echo_object)
225 .ckd_cache = &echo_thread_kmem,
226 .ckd_name = "echo_thread_kmem",
227 .ckd_size = sizeof(struct echo_thread_info)
230 .ckd_cache = &echo_session_kmem,
231 .ckd_name = "echo_session_kmem",
232 .ckd_size = sizeof(struct echo_session_info)
239 /** \defgroup echo_lu_ops lu_object operations
241 * operations for echo lu object.
245 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
246 const struct lu_object_conf *conf)
248 struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev));
249 struct echo_client_obd *ec = ed->ed_ec;
250 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
254 struct lu_object *below;
255 struct lu_device *under;
258 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
262 lu_object_add(obj, below);
265 if (!ed->ed_next_ismd) {
266 const struct cl_object_conf *cconf = lu2cl_conf(conf);
267 struct echo_object_conf *econf = cl2echo_conf(cconf);
269 LASSERT(econf->eoc_oinfo != NULL);
272 * Transfer the oinfo pointer to eco that it won't be
275 eco->eo_oinfo = *econf->eoc_oinfo;
276 *econf->eoc_oinfo = NULL;
278 eco->eo_oinfo = NULL;
282 cl_object_page_init(lu2cl(obj), 0);
284 spin_lock(&ec->ec_lock);
285 list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
286 spin_unlock(&ec->ec_lock);
291 static void echo_object_delete(const struct lu_env *env, struct lu_object *obj)
293 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
294 struct echo_client_obd *ec;
298 /* object delete called unconditolally - layer init or not */
299 if (eco->eo_dev == NULL)
302 ec = eco->eo_dev->ed_ec;
304 spin_lock(&ec->ec_lock);
305 list_del_init(&eco->eo_obj_chain);
306 spin_unlock(&ec->ec_lock);
309 OBD_FREE_PTR(eco->eo_oinfo);
312 static void echo_object_free_rcu(struct rcu_head *head)
314 struct echo_object *eco = container_of(head, struct echo_object,
315 eo_hdr.coh_lu.loh_rcu);
317 kmem_cache_free(echo_object_kmem, eco);
320 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
322 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
327 lu_object_header_fini(obj->lo_header);
329 OBD_FREE_PRE(eco, sizeof(*eco), "slab-freed");
330 call_rcu(&eco->eo_hdr.coh_lu.loh_rcu, echo_object_free_rcu);
334 static int echo_object_print(const struct lu_env *env, void *cookie,
335 lu_printer_t p, const struct lu_object *o)
337 struct echo_object *obj = cl2echo_obj(lu2cl(o));
339 return (*p)(env, cookie, "echoclient-object@%p", obj);
342 static const struct lu_object_operations echo_lu_obj_ops = {
343 .loo_object_init = echo_object_init,
344 .loo_object_delete = echo_object_delete,
345 .loo_object_release = NULL,
346 .loo_object_free = echo_object_free,
347 .loo_object_print = echo_object_print,
348 .loo_object_invariant = NULL
350 /** @} echo_lu_ops */
352 /** \defgroup echo_lu_dev_ops lu_device operations
354 * Operations for echo lu device.
358 static struct lu_object *echo_object_alloc(const struct lu_env *env,
359 const struct lu_object_header *hdr,
360 struct lu_device *dev)
362 struct echo_object *eco;
363 struct lu_object *obj = NULL;
366 /* we're the top dev. */
367 LASSERT(hdr == NULL);
368 OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
370 struct cl_object_header *hdr = &eco->eo_hdr;
372 obj = &echo_obj2cl(eco)->co_lu;
373 cl_object_header_init(hdr);
374 hdr->coh_page_bufsize = round_up(sizeof(struct cl_page), 8);
376 lu_object_init(obj, &hdr->coh_lu, dev);
377 lu_object_add_top(&hdr->coh_lu, obj);
378 obj->lo_ops = &echo_lu_obj_ops;
383 static const struct lu_device_operations echo_device_lu_ops = {
384 .ldo_object_alloc = echo_object_alloc,
387 /** @} echo_lu_dev_ops */
389 /** \defgroup echo_init Setup and teardown
391 * Init and fini functions for echo client.
395 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
397 struct cl_site *site = &ed->ed_site_myself;
400 /* initialize site */
401 rc = cl_site_init(site, &ed->ed_cl);
403 CERROR("Cannot initialize site for echo client(%d)\n", rc);
407 rc = lu_site_init_finish(&site->cs_lu);
413 ed->ed_site = &site->cs_lu;
417 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
420 if (!ed->ed_next_ismd)
421 lu_site_fini(ed->ed_site);
426 static void *echo_thread_key_init(const struct lu_context *ctx,
427 struct lu_context_key *key)
429 struct echo_thread_info *info;
431 OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
433 info = ERR_PTR(-ENOMEM);
437 static void echo_thread_key_fini(const struct lu_context *ctx,
438 struct lu_context_key *key, void *data)
440 struct echo_thread_info *info = data;
442 OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
445 static struct lu_context_key echo_thread_key = {
446 .lct_tags = LCT_CL_THREAD,
447 .lct_init = echo_thread_key_init,
448 .lct_fini = echo_thread_key_fini,
451 static void *echo_session_key_init(const struct lu_context *ctx,
452 struct lu_context_key *key)
454 struct echo_session_info *session;
456 OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
458 session = ERR_PTR(-ENOMEM);
462 static void echo_session_key_fini(const struct lu_context *ctx,
463 struct lu_context_key *key, void *data)
465 struct echo_session_info *session = data;
467 OBD_SLAB_FREE_PTR(session, echo_session_kmem);
470 static struct lu_context_key echo_session_key = {
471 .lct_tags = LCT_SESSION,
472 .lct_init = echo_session_key_init,
473 .lct_fini = echo_session_key_fini,
476 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
478 #ifdef HAVE_SERVER_SUPPORT
479 # define ECHO_SEQ_WIDTH 0xffffffff
480 static int echo_fid_init(struct echo_device *ed, char *obd_name,
481 struct seq_server_site *ss)
487 OBD_ALLOC_PTR(ed->ed_cl_seq);
491 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
493 GOTO(out_free_seq, rc = -ENOMEM);
495 snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
497 /* Init client side sequence-manager */
498 seq_client_init(ed->ed_cl_seq, NULL,
500 prefix, ss->ss_server_seq);
501 ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
502 OBD_FREE(prefix, MAX_OBD_NAME + 5);
507 OBD_FREE_PTR(ed->ed_cl_seq);
508 ed->ed_cl_seq = NULL;
512 static int echo_fid_fini(struct obd_device *obd)
514 struct echo_device *ed = obd2echo_dev(obd);
518 seq_client_fini(ed->ed_cl_seq);
519 OBD_FREE_PTR(ed->ed_cl_seq);
520 ed->ed_cl_seq = NULL;
526 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
529 if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
530 local_oid_storage_fini(env, ed->ed_los);
536 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
537 struct local_oid_storage *los,
538 const struct lu_fid *pfid, const char *name,
539 __u32 mode, struct lu_fid *fid)
541 struct dt_object *parent = NULL;
542 struct dt_object *dto = NULL;
546 LASSERT(!fid_is_zero(pfid));
547 parent = dt_locate(env, emd->emd_bottom, pfid);
548 if (unlikely(IS_ERR(parent)))
549 RETURN(PTR_ERR(parent));
551 /* create local file with @fid */
552 dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
555 GOTO(out_put, rc = PTR_ERR(dto));
557 *fid = *lu_object_fid(&dto->do_lu);
559 * since stack is not fully set up the local_storage uses own stack
560 * and we should drop its object from cache
562 dt_object_put_nocache(env, dto);
566 dt_object_put(env, parent);
571 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
572 struct echo_device *ed)
578 /* Setup local dirs */
579 fid.f_seq = FID_SEQ_LOCAL_NAME;
582 rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
586 lu_echo_root_fid(&fid);
587 if (echo_md_seq_site(emd)->ss_node_id == 0) {
588 rc = echo_md_local_file_create(env, emd, ed->ed_los,
589 &emd->emd_local_root_fid,
590 echo_md_root_dir_name, S_IFDIR |
591 S_IRUGO | S_IWUSR | S_IXUGO,
594 CERROR("%s: create md echo root fid failed: rc = %d\n",
595 emd2obd_dev(emd)->obd_name, rc);
599 ed->ed_root_fid = fid;
603 echo_ed_los_fini(env, ed);
607 #endif /* HAVE_SERVER_SUPPORT */
609 static struct lu_device *echo_device_alloc(const struct lu_env *env,
610 struct lu_device_type *t,
611 struct lustre_cfg *cfg)
613 struct lu_device *next;
614 struct echo_device *ed;
615 struct cl_device *cd;
616 struct obd_device *obd = NULL; /* to keep compiler happy */
617 struct obd_device *tgt;
618 const char *tgt_type_name;
625 GOTO(out, rc = -ENOMEM);
629 rc = cl_device_init(cd, t);
633 cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
636 obd = class_name2obd(lustre_cfg_string(cfg, 0));
637 LASSERT(obd != NULL);
638 LASSERT(env != NULL);
640 tgt = class_name2obd(lustre_cfg_string(cfg, 1));
642 CERROR("Can not find tgt device %s\n",
643 lustre_cfg_string(cfg, 1));
644 GOTO(out, rc = -ENODEV);
647 next = tgt->obd_lu_dev;
649 if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
650 ed->ed_next_ismd = 1;
651 } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
652 strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
653 ed->ed_next_ismd = 0;
654 rc = echo_site_init(env, ed);
658 GOTO(out, rc = -EINVAL);
663 rc = echo_client_setup(env, obd, cfg);
667 ed->ed_ec = &obd->u.echo_client;
670 if (ed->ed_next_ismd) {
671 #ifdef HAVE_SERVER_SUPPORT
672 /* Suppose to connect to some Metadata layer */
673 struct lu_site *ls = NULL;
674 struct lu_device *ld = NULL;
675 struct md_device *md = NULL;
676 struct echo_md_device *emd = NULL;
680 CERROR("%s is not lu device type!\n",
681 lustre_cfg_string(cfg, 1));
682 GOTO(out, rc = -EINVAL);
685 tgt_type_name = lustre_cfg_string(cfg, 2);
686 if (!tgt_type_name || strcmp(tgt_type_name, "mdd")) {
687 CERROR("%s no type name for echo %s setup\n",
688 lustre_cfg_string(cfg, 1),
689 tgt->obd_type->typ_name);
690 GOTO(out, rc = -EINVAL);
695 spin_lock(&ls->ls_ld_lock);
696 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
697 if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
702 spin_unlock(&ls->ls_ld_lock);
705 CERROR("%s is not lu device type!\n",
706 lustre_cfg_string(cfg, 1));
707 GOTO(out, rc = -EINVAL);
711 /* For MD echo client, it will use the site in MDS stack */
713 ed->ed_cl.cd_lu_dev.ld_site = ls;
714 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
716 CERROR("echo fid init error %d\n", rc);
720 md = lu2md_dev(next);
721 emd = lu2emd_dev(&md->md_lu_dev);
722 rc = echo_md_root_get(env, emd, ed);
724 CERROR("%s: get root error: rc = %d\n",
725 emd2obd_dev(emd)->obd_name, rc);
728 #else /* !HAVE_SERVER_SUPPORT */
730 "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
731 GOTO(out, rc = -EOPNOTSUPP);
732 #endif /* HAVE_SERVER_SUPPORT */
735 * if echo client is to be stacked upon ost device, the next is
736 * NULL since ost is not a clio device so far
738 if (next != NULL && !lu_device_is_cl(next))
741 tgt_type_name = tgt->obd_type->typ_name;
743 LASSERT(next != NULL);
745 GOTO(out, rc = -EBUSY);
747 next->ld_site = ed->ed_site;
748 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
749 next->ld_type->ldt_name,
754 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
759 RETURN(&cd->cd_lu_dev);
765 rc2 = echo_client_cleanup(obd);
767 CERROR("Cleanup obd device %s error(%d)\n",
773 echo_site_fini(env, ed);
776 cl_device_fini(&ed->ed_cl);
788 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
789 const char *name, struct lu_device *next)
795 static struct lu_device *echo_device_fini(const struct lu_env *env,
798 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
799 struct lu_device *next = ed->ed_next;
801 while (next && !ed->ed_next_ismd)
802 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
806 static struct lu_device *echo_device_free(const struct lu_env *env,
809 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
810 struct echo_client_obd *ec = ed->ed_ec;
811 struct echo_object *eco;
812 struct lu_device *next = ed->ed_next;
814 CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
817 lu_site_purge(env, ed->ed_site, -1);
820 * check if there are objects still alive.
821 * It shouldn't have any object because lu_site_purge would cleanup
822 * all of cached objects. Anyway, probably the echo device is being
823 * parallelly accessed.
825 spin_lock(&ec->ec_lock);
826 list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
828 spin_unlock(&ec->ec_lock);
831 lu_site_purge(env, ed->ed_site, -1);
834 "Waiting for the reference of echo object to be dropped\n");
836 /* Wait for the last reference to be dropped. */
837 spin_lock(&ec->ec_lock);
838 while (!list_empty(&ec->ec_objects)) {
839 spin_unlock(&ec->ec_lock);
841 "echo_client still has objects at cleanup time, wait for 1 second\n");
842 schedule_timeout_uninterruptible(cfs_time_seconds(1));
843 lu_site_purge(env, ed->ed_site, -1);
844 spin_lock(&ec->ec_lock);
846 spin_unlock(&ec->ec_lock);
848 LASSERT(list_empty(&ec->ec_locks));
850 CDEBUG(D_INFO, "No object exists, exiting...\n");
852 echo_client_cleanup(d->ld_obd);
853 #ifdef HAVE_SERVER_SUPPORT
854 echo_fid_fini(d->ld_obd);
855 echo_ed_los_fini(env, ed);
857 while (next && !ed->ed_next_ismd)
858 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
860 LASSERT(ed->ed_site == d->ld_site);
861 echo_site_fini(env, ed);
862 cl_device_fini(&ed->ed_cl);
865 cl_env_cache_purge(~0);
870 static const struct lu_device_type_operations echo_device_type_ops = {
871 .ldto_init = echo_type_init,
872 .ldto_fini = echo_type_fini,
874 .ldto_start = echo_type_start,
875 .ldto_stop = echo_type_stop,
877 .ldto_device_alloc = echo_device_alloc,
878 .ldto_device_free = echo_device_free,
879 .ldto_device_init = echo_device_init,
880 .ldto_device_fini = echo_device_fini
883 static struct lu_device_type echo_device_type = {
884 .ldt_tags = LU_DEVICE_CL,
885 .ldt_name = LUSTRE_ECHO_CLIENT_NAME,
886 .ldt_ops = &echo_device_type_ops,
887 .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
891 /** \defgroup echo_exports Exported operations
893 * exporting functions to echo client
898 /* Interfaces to echo client obd device */
899 static struct echo_object *
900 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
903 struct echo_thread_info *info;
904 struct echo_object_conf *conf;
905 struct echo_object *eco;
906 struct cl_object *obj;
907 struct lov_oinfo *oinfo = NULL;
913 LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
914 LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
916 /* Never return an object if the obd is to be freed. */
917 if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
918 RETURN(ERR_PTR(-ENODEV));
920 env = cl_env_get(&refcheck);
924 info = echo_env_info(env);
925 conf = &info->eti_conf;
927 OBD_ALLOC_PTR(oinfo);
929 GOTO(out, eco = ERR_PTR(-ENOMEM));
932 conf->eoc_cl.u.coc_oinfo = oinfo;
936 * If echo_object_init() is successful then ownership of oinfo
937 * is transferred to the object.
939 conf->eoc_oinfo = &oinfo;
941 fid = &info->eti_fid;
942 rc = ostid_to_fid(fid, oi, 0);
944 GOTO(out, eco = ERR_PTR(rc));
947 * In the function below, .hs_keycmp resolves to
948 * lu_obj_hop_keycmp()
950 obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
952 GOTO(out, eco = (void *)obj);
954 eco = cl2echo_obj(obj);
955 if (eco->eo_deleted) {
956 cl_object_put(env, obj);
957 eco = ERR_PTR(-EAGAIN);
964 cl_env_put(env, &refcheck);
968 static int cl_echo_object_put(struct echo_object *eco)
971 struct cl_object *obj = echo_obj2cl(eco);
975 env = cl_env_get(&refcheck);
977 RETURN(PTR_ERR(env));
979 /* an external function to kill an object? */
980 if (eco->eo_deleted) {
981 struct lu_object_header *loh = obj->co_lu.lo_header;
983 LASSERT(&eco->eo_hdr == luh2coh(loh));
984 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
987 cl_object_put(env, obj);
988 cl_env_put(env, &refcheck);
992 /** @} echo_exports */
994 static u64 last_object_id;
996 #ifdef HAVE_SERVER_SUPPORT
997 static void echo_md_build_name(struct lu_name *lname, char *name,
1000 snprintf(name, ETI_NAME_LEN, "%llu", id);
1001 lname->ln_name = name;
1002 lname->ln_namelen = strlen(name);
1005 /* similar to mdt_attr_get_complex */
1006 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1009 struct echo_thread_info *info = echo_env_info(env);
1014 LASSERT(ma->ma_lmm_size > 0);
1016 LASSERT(ma->ma_need & (MA_LOV | MA_LMV));
1017 if (ma->ma_need & MA_LOV)
1018 rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1020 rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LMV);
1025 /* big_lmm may need to be grown */
1026 if (info->eti_big_lmmsize < rc) {
1027 int size = size_roundup_power2(rc);
1029 if (info->eti_big_lmmsize > 0) {
1030 /* free old buffer */
1031 LASSERT(info->eti_big_lmm);
1032 OBD_FREE_LARGE(info->eti_big_lmm,
1033 info->eti_big_lmmsize);
1034 info->eti_big_lmm = NULL;
1035 info->eti_big_lmmsize = 0;
1038 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1039 if (!info->eti_big_lmm)
1041 info->eti_big_lmmsize = size;
1043 LASSERT(info->eti_big_lmmsize >= rc);
1045 info->eti_buf.lb_buf = info->eti_big_lmm;
1046 info->eti_buf.lb_len = info->eti_big_lmmsize;
1047 if (ma->ma_need & MA_LOV)
1048 rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1050 rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LMV);
1054 if (ma->ma_need & MA_LOV)
1055 ma->ma_valid |= MA_LOV;
1057 ma->ma_valid |= MA_LMV;
1059 ma->ma_lmm = info->eti_big_lmm;
1060 ma->ma_lmm_size = rc;
1065 static int echo_attr_get_complex(const struct lu_env *env,
1066 struct md_object *next,
1069 struct echo_thread_info *info = echo_env_info(env);
1070 struct lu_buf *buf = &info->eti_buf;
1071 umode_t mode = lu_object_attr(&next->mo_lu);
1078 if (ma->ma_need & MA_INODE) {
1079 rc = mo_attr_get(env, next, ma);
1082 ma->ma_valid |= MA_INODE;
1085 if ((ma->ma_need & MA_LOV) && (S_ISREG(mode) || S_ISDIR(mode))) {
1086 LASSERT(ma->ma_lmm_size > 0);
1087 buf->lb_buf = ma->ma_lmm;
1088 buf->lb_len = ma->ma_lmm_size;
1089 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1091 ma->ma_lmm_size = rc2;
1092 ma->ma_valid |= MA_LOV;
1093 } else if (rc2 == -ENODATA) {
1095 ma->ma_lmm_size = 0;
1096 } else if (rc2 == -ERANGE) {
1097 rc2 = echo_big_lmm_get(env, next, ma);
1099 GOTO(out, rc = rc2);
1101 GOTO(out, rc = rc2);
1105 if ((ma->ma_need & MA_LMV) && S_ISDIR(mode)) {
1106 LASSERT(ma->ma_lmm_size > 0);
1107 buf->lb_buf = ma->ma_lmm;
1108 buf->lb_len = ma->ma_lmm_size;
1109 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
1111 ma->ma_lmm_size = rc2;
1112 ma->ma_valid |= MA_LMV;
1113 } else if (rc2 == -ENODATA) {
1115 ma->ma_lmm_size = 0;
1116 } else if (rc2 == -ERANGE) {
1117 rc2 = echo_big_lmm_get(env, next, ma);
1119 GOTO(out, rc = rc2);
1121 GOTO(out, rc = rc2);
1125 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1126 if ((ma->ma_need & MA_ACL_DEF) && S_ISDIR(mode)) {
1127 buf->lb_buf = ma->ma_acl;
1128 buf->lb_len = ma->ma_acl_size;
1129 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1131 ma->ma_acl_size = rc2;
1132 ma->ma_valid |= MA_ACL_DEF;
1133 } else if (rc2 == -ENODATA) {
1135 ma->ma_acl_size = 0;
1137 GOTO(out, rc = rc2);
1142 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1143 rc, ma->ma_valid, ma->ma_lmm);
1148 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1149 struct md_object *parent, struct lu_fid *fid,
1150 struct lu_name *lname, struct md_op_spec *spec,
1153 struct lu_object *ec_child, *child;
1154 struct lu_device *ld = ed->ed_next;
1155 struct echo_thread_info *info = echo_env_info(env);
1156 struct lu_fid *fid2 = &info->eti_fid2;
1157 struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
1162 rc = mdo_lookup(env, parent, lname, fid2, spec);
1165 else if (rc != -ENOENT)
1168 ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1170 if (IS_ERR(ec_child)) {
1171 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1173 RETURN(PTR_ERR(ec_child));
1176 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1178 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1179 GOTO(out_put, rc = -EINVAL);
1182 CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1183 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1186 * Do not perform lookup sanity check. We know that name does not exist.
1188 spec->sp_cr_lookup = 0;
1189 rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1191 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1194 CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc = %d\n",
1195 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1198 lu_object_put(env, ec_child);
1202 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1205 struct echo_thread_info *info = echo_env_info(env);
1207 if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1208 ma->ma_lmm = (void *)&info->eti_lmm;
1209 ma->ma_lmm_size = sizeof(info->eti_lmm);
1211 LASSERT(info->eti_big_lmmsize);
1212 ma->ma_lmm = info->eti_big_lmm;
1213 ma->ma_lmm_size = info->eti_big_lmmsize;
1220 echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
1221 struct lu_object *obj, const char *name,
1222 unsigned int namelen, __u64 id,
1223 struct lu_object **new_parent)
1225 struct echo_thread_info *info = echo_env_info(env);
1226 struct md_attr *ma = &info->eti_ma;
1227 struct lmv_mds_md_v1 *lmv;
1228 struct lu_device *ld = ed->ed_next;
1230 struct lu_name tmp_ln_name;
1231 struct lu_fid stripe_fid;
1232 struct lu_object *stripe_obj;
1235 LASSERT(obj != NULL);
1236 LASSERT(S_ISDIR(obj->lo_header->loh_attr));
1238 memset(ma, 0, sizeof(*ma));
1239 echo_set_lmm_size(env, ld, ma);
1240 ma->ma_need = MA_LMV;
1241 rc = echo_attr_get_complex(env, lu2md(obj), ma);
1243 CERROR("Can not getattr child "DFID": rc = %d\n",
1244 PFID(lu_object_fid(obj)), rc);
1248 if (!(ma->ma_valid & MA_LMV)) {
1253 lmv = (struct lmv_mds_md_v1 *)ma->ma_lmm;
1254 if (!lmv_is_sane(lmv)) {
1256 CERROR("Invalid mds md magic %x "DFID": rc = %d\n",
1257 le32_to_cpu(lmv->lmv_magic), PFID(lu_object_fid(obj)),
1263 tmp_ln_name.ln_name = name;
1264 tmp_ln_name.ln_namelen = namelen;
1267 echo_md_build_name(&tmp_ln_name, info->eti_name, id);
1270 idx = lmv_name_to_stripe_index(lmv, tmp_ln_name.ln_name,
1271 tmp_ln_name.ln_namelen);
1273 LASSERT(idx < le32_to_cpu(lmv->lmv_stripe_count));
1274 fid_le_to_cpu(&stripe_fid, &lmv->lmv_stripe_fids[idx]);
1276 stripe_obj = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, &stripe_fid,
1278 if (IS_ERR(stripe_obj)) {
1279 rc = PTR_ERR(stripe_obj);
1280 CERROR("Can not find the parent "DFID": rc = %d\n",
1281 PFID(&stripe_fid), rc);
1285 *new_parent = lu_object_locate(stripe_obj->lo_header, ld->ld_type);
1287 lu_object_put(env, stripe_obj);
1294 static int echo_create_md_object(const struct lu_env *env,
1295 struct echo_device *ed,
1296 struct lu_object *ec_parent,
1298 char *name, int namelen,
1299 __u64 id, __u32 mode, int count,
1300 int stripe_count, int stripe_offset)
1302 struct lu_object *parent;
1303 struct lu_object *new_parent;
1304 struct echo_thread_info *info = echo_env_info(env);
1305 struct lu_name *lname = &info->eti_lname;
1306 struct md_op_spec *spec = &info->eti_spec;
1307 struct md_attr *ma = &info->eti_ma;
1308 struct lu_device *ld = ed->ed_next;
1316 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1320 rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
1325 LASSERT(new_parent != NULL);
1326 memset(ma, 0, sizeof(*ma));
1327 memset(spec, 0, sizeof(*spec));
1328 echo_set_lmm_size(env, ld, ma);
1329 if (stripe_count != 0) {
1330 spec->sp_cr_flags |= MDS_FMODE_WRITE;
1331 if (stripe_count != -1) {
1332 if (S_ISDIR(mode)) {
1333 struct lmv_user_md *lmu;
1335 lmu = (struct lmv_user_md *)&info->eti_lum;
1336 lmu->lum_magic = LMV_USER_MAGIC;
1337 lmu->lum_stripe_offset = stripe_offset;
1338 lmu->lum_stripe_count = stripe_count;
1339 lmu->lum_hash_type = LMV_HASH_TYPE_FNV_1A_64;
1340 spec->u.sp_ea.eadata = lmu;
1341 spec->u.sp_ea.eadatalen = sizeof(*lmu);
1343 struct lov_user_md_v3 *lum = &info->eti_lum;
1345 lum->lmm_magic = LOV_USER_MAGIC_V3;
1346 lum->lmm_stripe_count = stripe_count;
1347 lum->lmm_stripe_offset = stripe_offset;
1348 lum->lmm_pattern = LOV_PATTERN_NONE;
1349 spec->u.sp_ea.eadata = lum;
1350 spec->u.sp_ea.eadatalen = sizeof(*lum);
1352 spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1356 ma->ma_attr.la_mode = mode;
1357 ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1358 ma->ma_attr.la_ctime = ktime_get_real_seconds();
1361 lname->ln_name = name;
1362 lname->ln_namelen = namelen;
1363 /* If name is specified, only create one object by name */
1364 rc = echo_md_create_internal(env, ed, lu2md(new_parent), fid,
1369 /* Create multiple object sequenced by id */
1370 for (i = 0; i < count; i++) {
1371 char *tmp_name = info->eti_name;
1373 echo_md_build_name(lname, tmp_name, id);
1375 rc = echo_md_create_internal(env, ed, lu2md(new_parent),
1376 fid, lname, spec, ma);
1378 CERROR("Can not create child %s: rc = %d\n", tmp_name,
1387 if (new_parent != parent)
1388 lu_object_put(env, new_parent);
1393 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1394 struct echo_device *ed,
1395 struct md_object *parent,
1396 struct lu_name *lname)
1398 struct echo_thread_info *info = echo_env_info(env);
1399 struct lu_fid *fid = &info->eti_fid;
1400 struct lu_object *child;
1404 CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1407 rc = mdo_lookup(env, parent, lname, fid, NULL);
1409 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1410 RETURN(ERR_PTR(rc));
1414 * In the function below, .hs_keycmp resolves to
1415 * lu_obj_hop_keycmp()
1417 child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1422 static int echo_setattr_object(const struct lu_env *env,
1423 struct echo_device *ed,
1424 struct lu_object *ec_parent,
1425 __u64 id, int count)
1427 struct lu_object *parent;
1428 struct lu_object *new_parent;
1429 struct echo_thread_info *info = echo_env_info(env);
1430 struct lu_name *lname = &info->eti_lname;
1431 char *name = info->eti_name;
1432 struct lu_device *ld = ed->ed_next;
1433 struct lu_buf *buf = &info->eti_buf;
1441 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1445 rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
1450 for (i = 0; i < count; i++) {
1451 struct lu_object *ec_child, *child;
1453 echo_md_build_name(lname, name, id);
1455 ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
1456 if (IS_ERR(ec_child)) {
1457 rc = PTR_ERR(ec_child);
1458 CERROR("Can't find child %s: rc = %d\n",
1459 lname->ln_name, rc);
1463 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1465 CERROR("Can not locate the child %s\n", lname->ln_name);
1466 lu_object_put(env, ec_child);
1471 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1472 PFID(lu_object_fid(child)));
1474 buf->lb_buf = info->eti_xattr_buf;
1475 buf->lb_len = sizeof(info->eti_xattr_buf);
1477 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1478 rc = mo_xattr_set(env, lu2md(child), buf, name,
1481 CERROR("Can not setattr child "DFID": rc = %d\n",
1482 PFID(lu_object_fid(child)), rc);
1483 lu_object_put(env, ec_child);
1486 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1487 PFID(lu_object_fid(child)));
1489 lu_object_put(env, ec_child);
1492 if (new_parent != parent)
1493 lu_object_put(env, new_parent);
1498 static int echo_getattr_object(const struct lu_env *env,
1499 struct echo_device *ed,
1500 struct lu_object *ec_parent,
1501 __u64 id, int count)
1503 struct lu_object *parent;
1504 struct lu_object *new_parent;
1505 struct echo_thread_info *info = echo_env_info(env);
1506 struct lu_name *lname = &info->eti_lname;
1507 char *name = info->eti_name;
1508 struct md_attr *ma = &info->eti_ma;
1509 struct lu_device *ld = ed->ed_next;
1517 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1521 rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
1526 memset(ma, 0, sizeof(*ma));
1527 ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1528 ma->ma_acl = info->eti_xattr_buf;
1529 ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1531 for (i = 0; i < count; i++) {
1532 struct lu_object *ec_child, *child;
1535 echo_md_build_name(lname, name, id);
1536 echo_set_lmm_size(env, ld, ma);
1538 ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
1539 if (IS_ERR(ec_child)) {
1540 CERROR("Can't find child %s: rc = %ld\n",
1541 lname->ln_name, PTR_ERR(ec_child));
1542 RETURN(PTR_ERR(ec_child));
1545 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1547 CERROR("Can not locate the child %s\n", lname->ln_name);
1548 lu_object_put(env, ec_child);
1552 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1553 PFID(lu_object_fid(child)));
1554 rc = echo_attr_get_complex(env, lu2md(child), ma);
1556 CERROR("Can not getattr child "DFID": rc = %d\n",
1557 PFID(lu_object_fid(child)), rc);
1558 lu_object_put(env, ec_child);
1561 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1562 PFID(lu_object_fid(child)));
1564 lu_object_put(env, ec_child);
1567 if (new_parent != parent)
1568 lu_object_put(env, new_parent);
1573 static int echo_lookup_object(const struct lu_env *env,
1574 struct echo_device *ed,
1575 struct lu_object *ec_parent,
1576 __u64 id, int count)
1578 struct lu_object *parent;
1579 struct lu_object *new_parent;
1580 struct echo_thread_info *info = echo_env_info(env);
1581 struct lu_name *lname = &info->eti_lname;
1582 char *name = info->eti_name;
1583 struct lu_fid *fid = &info->eti_fid;
1584 struct lu_device *ld = ed->ed_next;
1590 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1594 rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
1599 /*prepare the requests*/
1600 for (i = 0; i < count; i++) {
1601 echo_md_build_name(lname, name, id);
1603 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1604 PFID(lu_object_fid(new_parent)), lname->ln_name,
1607 rc = mdo_lookup(env, lu2md(new_parent), lname, fid, NULL);
1609 CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1613 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1614 PFID(lu_object_fid(new_parent)), lname->ln_name,
1620 if (new_parent != parent)
1621 lu_object_put(env, new_parent);
1626 static int echo_md_destroy_internal(const struct lu_env *env,
1627 struct echo_device *ed,
1628 struct md_object *parent,
1629 struct lu_name *lname,
1632 struct lu_device *ld = ed->ed_next;
1633 struct lu_object *ec_child;
1634 struct lu_object *child;
1639 ec_child = echo_md_lookup(env, ed, parent, lname);
1640 if (IS_ERR(ec_child)) {
1641 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1643 RETURN(PTR_ERR(ec_child));
1646 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1648 CERROR("Can not locate the child %s\n", lname->ln_name);
1649 GOTO(out_put, rc = -EINVAL);
1652 if (lu_object_remote(child)) {
1653 CERROR("Can not destroy remote object %s: rc = %d\n",
1654 lname->ln_name, -EPERM);
1655 GOTO(out_put, rc = -EPERM);
1657 CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1658 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1660 rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1662 CERROR("Can not unlink child %s: rc = %d\n",
1663 lname->ln_name, rc);
1666 CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1667 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1669 lu_object_put(env, ec_child);
1673 static int echo_destroy_object(const struct lu_env *env,
1674 struct echo_device *ed,
1675 struct lu_object *ec_parent,
1676 char *name, int namelen,
1677 __u64 id, __u32 mode,
1680 struct echo_thread_info *info = echo_env_info(env);
1681 struct lu_name *lname = &info->eti_lname;
1682 struct md_attr *ma = &info->eti_ma;
1683 struct lu_device *ld = ed->ed_next;
1684 struct lu_object *parent;
1685 struct lu_object *new_parent;
1690 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1694 rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
1699 memset(ma, 0, sizeof(*ma));
1700 ma->ma_attr.la_mode = mode;
1701 ma->ma_attr.la_valid = LA_CTIME;
1702 ma->ma_attr.la_ctime = ktime_get_real_seconds();
1703 ma->ma_need = MA_INODE;
1707 lname->ln_name = name;
1708 lname->ln_namelen = namelen;
1709 rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
1714 /*prepare the requests*/
1715 for (i = 0; i < count; i++) {
1716 char *tmp_name = info->eti_name;
1719 echo_md_build_name(lname, tmp_name, id);
1721 rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
1724 CERROR("Can not unlink child %s: rc = %d\n", name, rc);
1731 if (new_parent != parent)
1732 lu_object_put(env, new_parent);
1737 static struct lu_object *echo_resolve_path(const struct lu_env *env,
1738 struct echo_device *ed, char *path,
1741 struct lu_device *ld = ed->ed_next;
1742 struct echo_thread_info *info = echo_env_info(env);
1743 struct lu_fid *fid = &info->eti_fid;
1744 struct lu_name *lname = &info->eti_lname;
1745 struct lu_object *parent = NULL;
1746 struct lu_object *child = NULL;
1750 *fid = ed->ed_root_fid;
1753 * In the function below, .hs_keycmp resolves to
1754 * lu_obj_hop_keycmp()
1756 parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1757 if (IS_ERR(parent)) {
1758 CERROR("Can not find the parent "DFID": rc = %ld\n",
1759 PFID(fid), PTR_ERR(parent));
1764 struct lu_object *ld_parent;
1767 e = strsep(&path, "/");
1772 if (!path || path[0] == '\0')
1778 lname->ln_namelen = strlen(e);
1780 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
1782 lu_object_put(env, parent);
1787 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
1788 lu_object_put(env, parent);
1789 if (IS_ERR(child)) {
1790 rc = (int)PTR_ERR(child);
1791 CERROR("lookup %s under parent "DFID": rc = %d\n",
1792 lname->ln_name, PFID(lu_object_fid(ld_parent)),
1800 RETURN(ERR_PTR(rc));
1802 if (!lu_object_exists(parent)) {
1803 lu_object_put(env, parent);
1804 parent = ERR_PTR(-ENOENT);
1810 static void echo_ucred_init(struct lu_env *env)
1812 struct lu_ucred *ucred = lu_ucred(env);
1813 kernel_cap_t kcap = current_cap();
1815 ucred->uc_valid = UCRED_INVALID;
1817 ucred->uc_suppgids[0] = -1;
1818 ucred->uc_suppgids[1] = -1;
1820 ucred->uc_uid = ucred->uc_o_uid =
1821 from_kuid(&init_user_ns, current_uid());
1822 ucred->uc_gid = ucred->uc_o_gid =
1823 from_kgid(&init_user_ns, current_gid());
1824 ucred->uc_fsuid = ucred->uc_o_fsuid =
1825 from_kuid(&init_user_ns, current_fsuid());
1826 ucred->uc_fsgid = ucred->uc_o_fsgid =
1827 from_kgid(&init_user_ns, current_fsgid());
1828 ucred->uc_cap = current_cap();
1830 /* remove fs privilege for non-root user. */
1831 if (ucred->uc_fsuid) {
1832 kcap = cap_drop_nfsd_set(kcap);
1833 kcap = cap_drop_fs_set(kcap);
1835 ucred->uc_cap = kcap;
1836 ucred->uc_valid = UCRED_NEW;
1837 /* do not let rbac interfere with obdecho */
1838 ucred->uc_rbac_file_perms = 1;
1839 ucred->uc_rbac_dne_ops = 1;
1840 ucred->uc_rbac_quota_ops = 1;
1841 ucred->uc_rbac_byfid_ops = 1;
1842 ucred->uc_rbac_chlg_ops = 1;
1843 ucred->uc_rbac_fscrypt_admin = 1;
1846 static void echo_ucred_fini(struct lu_env *env)
1848 struct lu_ucred *ucred = lu_ucred(env);
1850 ucred->uc_valid = UCRED_INIT;
1853 static int echo_md_handler(struct echo_device *ed, int command,
1854 char *path, int path_len, __u64 id, int count,
1855 struct obd_ioctl_data *data)
1857 struct echo_thread_info *info;
1858 struct lu_device *ld = ed->ed_next;
1861 struct lu_object *parent;
1863 int namelen = data->ioc_plen2;
1868 CERROR("MD echo client is not being initialized properly\n");
1872 if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1873 CERROR("Only support MDD layer right now!\n");
1877 env = cl_env_get(&refcheck);
1879 RETURN(PTR_ERR(env));
1881 rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_SES_TAG);
1885 /* init big_lmm buffer */
1886 info = echo_env_info(env);
1887 LASSERT(info->eti_big_lmm == NULL);
1888 OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
1889 if (!info->eti_big_lmm)
1890 GOTO(out_env, rc = -ENOMEM);
1891 info->eti_big_lmmsize = MIN_MD_SIZE;
1893 parent = echo_resolve_path(env, ed, path, path_len);
1894 if (IS_ERR(parent)) {
1895 CERROR("Can not resolve the path %s: rc = %ld\n", path,
1897 GOTO(out_free, rc = PTR_ERR(parent));
1901 OBD_ALLOC(name, namelen + 1);
1903 GOTO(out_put, rc = -ENOMEM);
1904 if (copy_from_user(name, data->ioc_pbuf2, namelen))
1905 GOTO(out_name, rc = -EFAULT);
1908 echo_ucred_init(env);
1911 case ECHO_MD_CREATE:
1912 case ECHO_MD_MKDIR: {
1913 struct echo_thread_info *info = echo_env_info(env);
1914 __u32 mode = data->ioc_obdo2.o_mode;
1915 struct lu_fid *fid = &info->eti_fid;
1916 int stripe_count = (int)data->ioc_obdo2.o_misc;
1917 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
1919 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
1924 * In the function below, .hs_keycmp resolves to
1925 * lu_obj_hop_keycmp()
1927 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
1928 id, mode, count, stripe_count,
1932 case ECHO_MD_DESTROY:
1933 case ECHO_MD_RMDIR: {
1934 __u32 mode = data->ioc_obdo2.o_mode;
1936 rc = echo_destroy_object(env, ed, parent, name, namelen,
1940 case ECHO_MD_LOOKUP:
1941 rc = echo_lookup_object(env, ed, parent, id, count);
1943 case ECHO_MD_GETATTR:
1944 rc = echo_getattr_object(env, ed, parent, id, count);
1946 case ECHO_MD_SETATTR:
1947 rc = echo_setattr_object(env, ed, parent, id, count);
1950 CERROR("unknown command %d\n", command);
1954 echo_ucred_fini(env);
1958 OBD_FREE(name, namelen + 1);
1960 lu_object_put(env, parent);
1962 LASSERT(info->eti_big_lmm);
1963 OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
1964 info->eti_big_lmm = NULL;
1965 info->eti_big_lmmsize = 0;
1967 cl_env_put(env, &refcheck);
1970 #endif /* HAVE_SERVER_SUPPORT */
1972 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1975 struct echo_object *eco;
1976 struct echo_client_obd *ec = ed->ed_ec;
1981 if (!(oa->o_valid & OBD_MD_FLID) ||
1982 !(oa->o_valid & OBD_MD_FLGROUP) ||
1983 !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
1984 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
1988 if (ostid_id(&oa->o_oi) == 0) {
1989 rc = ostid_set_id(&oa->o_oi, ++last_object_id);
1994 rc = obd_create(env, ec->ec_exp, oa);
1996 CERROR("Cannot create objects: rc = %d\n", rc);
2002 oa->o_valid |= OBD_MD_FLID;
2004 eco = cl_echo_object_find(ed, &oa->o_oi);
2006 GOTO(failed, rc = PTR_ERR(eco));
2007 cl_echo_object_put(eco);
2009 CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2013 if (created && rc != 0)
2014 obd_destroy(env, ec->ec_exp, oa);
2017 CERROR("create object failed with: rc = %d\n", rc);
2022 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2025 struct echo_object *eco;
2029 if (!(oa->o_valid & OBD_MD_FLID) ||
2030 !(oa->o_valid & OBD_MD_FLGROUP) ||
2031 ostid_id(&oa->o_oi) == 0) {
2032 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2037 eco = cl_echo_object_find(ed, &oa->o_oi);
2046 static void echo_put_object(struct echo_object *eco)
2050 rc = cl_echo_object_put(eco);
2052 CERROR("%s: echo client drop an object failed: rc = %d\n",
2053 eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2056 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2057 u64 offset, u64 count)
2064 /* no partial pages on the client */
2065 LASSERT(count == PAGE_SIZE);
2069 for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2070 if (rw == OBD_BRW_WRITE) {
2071 stripe_off = offset + delta;
2074 stripe_off = 0xdeadbeef00c0ffeeULL;
2075 stripe_id = 0xdeadbeef00c0ffeeULL;
2077 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2078 stripe_off, stripe_id);
2085 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2094 /* no partial pages on the client */
2095 LASSERT(count == PAGE_SIZE);
2099 for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2100 stripe_off = offset + delta;
2103 rc2 = block_debug_check("test_brw",
2104 addr + delta, OBD_ECHO_BLOCK_SIZE,
2105 stripe_off, stripe_id);
2107 CERROR("Error in echo object %#llx\n", id);
2116 static int echo_client_prep_commit(const struct lu_env *env,
2117 struct obd_export *exp, int rw,
2118 struct obdo *oa, struct echo_object *eco,
2119 u64 offset, u64 count,
2120 u64 batch, int async)
2122 struct obd_ioobj ioo;
2123 struct niobuf_local *lnb;
2124 struct niobuf_remote rnb;
2126 u64 npages, tot_pages, apc;
2127 int i, ret = 0, brw_flags = 0;
2130 if (count <= 0 || (count & ~PAGE_MASK) != 0)
2133 apc = npages = batch >> PAGE_SHIFT;
2134 tot_pages = count >> PAGE_SHIFT;
2136 OBD_ALLOC_PTR_ARRAY_LARGE(lnb, apc);
2140 if (rw == OBD_BRW_WRITE && async)
2141 brw_flags |= OBD_BRW_ASYNC;
2143 obdo_to_ioobj(oa, &ioo);
2147 for (; tot_pages > 0; tot_pages -= npages) {
2150 if (tot_pages < npages)
2153 rnb.rnb_offset = off;
2154 rnb.rnb_len = npages * PAGE_SIZE;
2155 rnb.rnb_flags = brw_flags;
2157 off += npages * PAGE_SIZE;
2160 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
2164 for (i = 0; i < lpages; i++) {
2165 struct page *page = lnb[i].lnb_page;
2167 /* read past eof? */
2168 if (!page && lnb[i].lnb_rc == 0)
2172 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2174 if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2175 (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2176 (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2179 if (rw == OBD_BRW_WRITE)
2180 echo_client_page_debug_setup(page, rw,
2181 ostid_id(&oa->o_oi),
2182 lnb[i].lnb_file_offset,
2185 echo_client_page_debug_check(page,
2186 ostid_id(&oa->o_oi),
2187 lnb[i].lnb_file_offset,
2191 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
2192 ret, rnb.rnb_len, ktime_set(0, 0));
2196 /* Reuse env context. */
2197 lu_context_exit((struct lu_context *)&env->le_ctx);
2198 lu_context_enter((struct lu_context *)&env->le_ctx);
2202 OBD_FREE_PTR_ARRAY_LARGE(lnb, apc);
2207 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2208 struct obd_export *exp,
2209 struct obd_ioctl_data *data)
2211 struct obd_device *obd = class_exp2obd(exp);
2212 struct echo_device *ed = obd2echo_dev(obd);
2213 struct echo_client_obd *ec = ed->ed_ec;
2214 struct obdo *oa = &data->ioc_obdo1;
2215 struct echo_object *eco;
2221 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2223 rc = echo_get_object(&eco, ed, oa);
2227 oa->o_valid &= ~OBD_MD_FLHANDLE;
2229 /* OFD/obdfilter works only via prep/commit */
2230 test_mode = (long)data->ioc_pbuf1;
2231 if (!ed->ed_next && test_mode != 3) {
2233 data->ioc_plen1 = data->ioc_count;
2239 /* Truncate batch size to maximum */
2240 if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2241 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2243 switch (test_mode) {
2245 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
2246 data->ioc_offset, data->ioc_count,
2247 data->ioc_plen1, async);
2253 echo_put_object(eco);
2259 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2260 void *karg, void __user *uarg)
2262 #ifdef HAVE_SERVER_SUPPORT
2263 struct tgt_session_info *tsi;
2265 struct obd_device *obd = exp->exp_obd;
2266 struct echo_device *ed = obd2echo_dev(obd);
2267 struct echo_client_obd *ec = ed->ed_ec;
2268 struct echo_object *eco;
2269 struct obd_ioctl_data *data;
2271 unsigned long env_tags = 0;
2275 int rw = OBD_BRW_READ;
2279 CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
2280 exp->exp_obd->obd_name, cmd, len, karg, uarg);
2281 if (unlikely(karg == NULL))
2282 RETURN(OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", rc));
2285 oa = &data->ioc_obdo1;
2286 if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2287 oa->o_valid |= OBD_MD_FLGROUP;
2288 ostid_set_seq_echo(&oa->o_oi);
2291 /* This FID is unpacked just for validation at this point */
2292 rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2296 env = cl_env_get(&refcheck);
2298 RETURN(PTR_ERR(env));
2302 #ifdef HAVE_SERVER_SUPPORT
2303 if (cmd == OBD_IOC_ECHO_MD || cmd == OBD_IOC_ECHO_ALLOC_SEQ)
2304 env_tags = ECHO_MD_CTX_TAG;
2307 env_tags = ECHO_DT_CTX_TAG;
2309 rc = lu_env_refill_by_tags(env, env_tags, ECHO_SES_TAG);
2313 #ifdef HAVE_SERVER_SUPPORT
2314 tsi = tgt_ses_info(env);
2315 /* treat as local operation */
2316 tsi->tsi_exp = NULL;
2317 tsi->tsi_jobid = NULL;
2321 case OBD_IOC_CREATE: /* may create echo object */
2322 if (!capable(CAP_SYS_ADMIN))
2323 GOTO(out, rc = -EPERM);
2325 rc = echo_create_object(env, ed, oa);
2328 #ifdef HAVE_SERVER_SUPPORT
2329 case OBD_IOC_ECHO_MD: {
2336 if (!capable(CAP_SYS_ADMIN))
2337 GOTO(out, rc = -EPERM);
2339 count = data->ioc_count;
2340 cmd = data->ioc_command;
2342 id = data->ioc_obdo2.o_oi.oi.oi_id;
2343 dirlen = data->ioc_plen1;
2344 OBD_ALLOC(dir, dirlen + 1);
2346 GOTO(out, rc = -ENOMEM);
2348 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2349 OBD_FREE(dir, data->ioc_plen1 + 1);
2350 GOTO(out, rc = -EFAULT);
2353 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2354 OBD_FREE(dir, dirlen + 1);
2357 case OBD_IOC_ECHO_ALLOC_SEQ: {
2361 if (!capable(CAP_SYS_ADMIN))
2362 GOTO(out, rc = -EPERM);
2364 rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
2366 CERROR("%s: Can not alloc seq: rc = %d\n",
2371 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2374 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2375 if (copy_to_user(data->ioc_pbuf2, &max_count,
2380 #endif /* HAVE_SERVER_SUPPORT */
2381 case OBD_IOC_DESTROY:
2382 if (!capable(CAP_SYS_ADMIN))
2383 GOTO(out, rc = -EPERM);
2385 rc = echo_get_object(&eco, ed, oa);
2387 rc = obd_destroy(env, ec->ec_exp, oa);
2389 eco->eo_deleted = 1;
2390 echo_put_object(eco);
2394 case OBD_IOC_GETATTR:
2395 rc = echo_get_object(&eco, ed, oa);
2397 rc = obd_getattr(env, ec->ec_exp, oa);
2398 echo_put_object(eco);
2402 case OBD_IOC_SETATTR:
2403 if (!capable(CAP_SYS_ADMIN))
2404 GOTO(out, rc = -EPERM);
2406 rc = echo_get_object(&eco, ed, oa);
2408 rc = obd_setattr(env, ec->ec_exp, oa);
2409 echo_put_object(eco);
2413 case OBD_IOC_BRW_WRITE:
2414 if (!capable(CAP_SYS_ADMIN))
2415 GOTO(out, rc = -EPERM);
2419 case OBD_IOC_BRW_READ:
2420 rc = echo_client_brw_ioctl(env, rw, exp, data);
2423 rc = OBD_IOC_ERROR(obd->obd_name, cmd, "unrecognized", -ENOTTY);
2430 cl_env_put(env, &refcheck);
2435 static int echo_client_setup(const struct lu_env *env,
2436 struct obd_device *obd, struct lustre_cfg *lcfg)
2438 struct echo_client_obd *ec = &obd->u.echo_client;
2439 struct obd_device *tgt;
2440 struct obd_uuid echo_uuid = { "ECHO_UUID" };
2441 struct obd_connect_data *ocd = NULL;
2445 if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2446 CERROR("requires a TARGET OBD name\n");
2450 tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2451 if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2452 CERROR("device not attached or not set up (%s)\n",
2453 lustre_cfg_string(lcfg, 1));
2457 spin_lock_init(&ec->ec_lock);
2458 INIT_LIST_HEAD(&ec->ec_objects);
2459 INIT_LIST_HEAD(&ec->ec_locks);
2462 lu_context_tags_update(ECHO_DT_CTX_TAG);
2463 lu_session_tags_update(ECHO_SES_TAG);
2465 if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2466 #ifdef HAVE_SERVER_SUPPORT
2467 lu_context_tags_update(ECHO_MD_CTX_TAG);
2470 "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
2475 OBD_ALLOC(ocd, sizeof(*ocd));
2477 CERROR("Can't alloc ocd connecting to %s\n",
2478 lustre_cfg_string(lcfg, 1));
2482 ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2483 OBD_CONNECT_BRW_SIZE |
2484 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2485 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2486 OBD_CONNECT_FID | OBD_CONNECT_FLAGS2;
2487 ocd->ocd_connect_flags2 = OBD_CONNECT2_REP_MBITS;
2489 ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2490 ocd->ocd_version = LUSTRE_VERSION_CODE;
2491 ocd->ocd_group = FID_SEQ_ECHO;
2493 rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2495 /* Turn off pinger because it connects to tgt obd directly. */
2496 spin_lock(&tgt->obd_dev_lock);
2497 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2498 spin_unlock(&tgt->obd_dev_lock);
2501 OBD_FREE(ocd, sizeof(*ocd));
2504 CERROR("fail to connect to device %s\n",
2505 lustre_cfg_string(lcfg, 1));
2512 static int echo_client_cleanup(struct obd_device *obd)
2514 struct echo_device *ed = obd2echo_dev(obd);
2515 struct echo_client_obd *ec = &obd->u.echo_client;
2519 /*Do nothing for Metadata echo client*/
2523 lu_session_tags_clear(ECHO_SES_TAG & ~LCT_SESSION);
2524 lu_context_tags_clear(ECHO_DT_CTX_TAG);
2525 if (ed->ed_next_ismd) {
2526 #ifdef HAVE_SERVER_SUPPORT
2527 lu_context_tags_clear(ECHO_MD_CTX_TAG);
2530 "This is client-side only module, does not support metadata echo client.\n");
2535 if (!list_empty(&obd->obd_exports)) {
2536 CERROR("still has clients!\n");
2540 LASSERT(refcount_read(&ec->ec_exp->exp_handle.h_ref) > 0);
2541 rc = obd_disconnect(ec->ec_exp);
2543 CERROR("fail to disconnect device: %d\n", rc);
2548 static int echo_client_connect(const struct lu_env *env,
2549 struct obd_export **exp,
2550 struct obd_device *src, struct obd_uuid *cluuid,
2551 struct obd_connect_data *data, void *localdata)
2554 struct lustre_handle conn = { 0 };
2557 rc = class_connect(&conn, src, cluuid);
2559 *exp = class_conn2export(&conn);
2564 static int echo_client_disconnect(struct obd_export *exp)
2570 GOTO(out, rc = -EINVAL);
2572 rc = class_disconnect(exp);
2578 static const struct obd_ops echo_client_obd_ops = {
2579 .o_owner = THIS_MODULE,
2580 .o_iocontrol = echo_client_iocontrol,
2581 .o_connect = echo_client_connect,
2582 .o_disconnect = echo_client_disconnect
2585 static int __init obdecho_init(void)
2590 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2592 LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2594 rc = libcfs_setup();
2598 # ifdef HAVE_SERVER_SUPPORT
2599 rc = echo_persistent_pages_init();
2603 rc = class_register_type(&echo_obd_ops, NULL, true,
2604 LUSTRE_ECHO_NAME, &echo_srv_type);
2609 rc = lu_kmem_init(echo_caches);
2611 rc = class_register_type(&echo_client_obd_ops, NULL, false,
2612 LUSTRE_ECHO_CLIENT_NAME,
2615 lu_kmem_fini(echo_caches);
2618 # ifdef HAVE_SERVER_SUPPORT
2622 class_unregister_type(LUSTRE_ECHO_NAME);
2624 echo_persistent_pages_fini();
2630 static void __exit obdecho_exit(void)
2632 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2633 lu_kmem_fini(echo_caches);
2635 #ifdef HAVE_SERVER_SUPPORT
2636 class_unregister_type(LUSTRE_ECHO_NAME);
2637 echo_persistent_pages_fini();
2641 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2642 MODULE_DESCRIPTION("Lustre Echo Client test driver");
2643 MODULE_VERSION(LUSTRE_VERSION_STRING);
2644 MODULE_LICENSE("GPL");
2646 module_init(obdecho_init);
2647 module_exit(obdecho_exit);
2649 /** @} echo_client */