Whamcloud - gitweb
LU-8901 misc: update Intel copyright messages for 2016
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_ECHO
34
35 #include <linux/user_namespace.h>
36 #ifdef HAVE_UIDGID_HEADER
37 # include <linux/uidgid.h>
38 #endif
39 #include <libcfs/libcfs.h>
40
41 #include <obd.h>
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <lustre_debug.h>
45 #include <lprocfs_status.h>
46 #include <cl_object.h>
47 #include <lustre_fid.h>
48 #include <lustre_acl.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_net.h>
51 #ifdef HAVE_SERVER_SUPPORT
52 # include <md_object.h>
53
54 #define ETI_NAME_LEN    20
55
56 #endif /* HAVE_SERVER_SUPPORT */
57
58 #include "echo_internal.h"
59
60 /** \defgroup echo_client Echo Client
61  * @{
62  */
63
64 struct echo_device {
65         struct cl_device          ed_cl;
66         struct echo_client_obd   *ed_ec;
67
68         struct cl_site            ed_site_myself;
69         struct lu_site           *ed_site;
70         struct lu_device         *ed_next;
71         int                       ed_next_ismd;
72         struct lu_client_seq     *ed_cl_seq;
73 #ifdef HAVE_SERVER_SUPPORT
74         struct local_oid_storage *ed_los;
75         struct lu_fid             ed_root_fid;
76 #endif /* HAVE_SERVER_SUPPORT */
77 };
78
79 struct echo_object {
80         struct cl_object        eo_cl;
81         struct cl_object_header eo_hdr;
82         struct echo_device     *eo_dev;
83         struct list_head        eo_obj_chain;
84         struct lov_oinfo       *eo_oinfo;
85         atomic_t                eo_npages;
86         int                     eo_deleted;
87 };
88
89 struct echo_object_conf {
90         struct cl_object_conf   eoc_cl;
91         struct lov_oinfo      **eoc_oinfo;
92 };
93
94 struct echo_page {
95         struct cl_page_slice    ep_cl;
96         struct mutex            ep_lock;
97 };
98
99 struct echo_lock {
100         struct cl_lock_slice    el_cl;
101         struct list_head        el_chain;
102         struct echo_object     *el_object;
103         __u64                   el_cookie;
104         atomic_t                el_refcount;
105 };
106
107 #ifdef HAVE_SERVER_SUPPORT
108 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
109
110 /**
111  * In order to use the values of members in struct mdd_device,
112  * we define an alias structure here.
113  */
114 struct echo_md_device {
115         struct md_device                 emd_md_dev;
116         struct obd_export               *emd_child_exp;
117         struct dt_device                *emd_child;
118         struct dt_device                *emd_bottom;
119         struct lu_fid                    emd_root_fid;
120         struct lu_fid                    emd_local_root_fid;
121 };
122 #endif /* HAVE_SERVER_SUPPORT */
123
124 static int echo_client_setup(const struct lu_env *env,
125                              struct obd_device *obddev,
126                              struct lustre_cfg *lcfg);
127 static int echo_client_cleanup(struct obd_device *obddev);
128
129
130 /** \defgroup echo_helpers Helper functions
131  * @{
132  */
133 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
134 {
135         return container_of0(dev, struct echo_device, ed_cl);
136 }
137
138 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
139 {
140         return &d->ed_cl;
141 }
142
143 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
144 {
145         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
146 }
147
148 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
149 {
150         return &eco->eo_cl;
151 }
152
153 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
154 {
155         return container_of(o, struct echo_object, eo_cl);
156 }
157
158 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
159 {
160         return container_of(s, struct echo_page, ep_cl);
161 }
162
163 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
164 {
165         return container_of(s, struct echo_lock, el_cl);
166 }
167
168 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
169 {
170         return ecl->el_cl.cls_lock;
171 }
172
173 static struct lu_context_key echo_thread_key;
174 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
175 {
176         struct echo_thread_info *info;
177         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
178         LASSERT(info != NULL);
179         return info;
180 }
181
182 static inline
183 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
184 {
185         return container_of(c, struct echo_object_conf, eoc_cl);
186 }
187
188 #ifdef HAVE_SERVER_SUPPORT
189 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
190 {
191         return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
192 }
193
194 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
195 {
196         return &d->emd_md_dev.md_lu_dev;
197 }
198
199 static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
200 {
201         return emd2lu_dev(d)->ld_site->ld_seq_site;
202 }
203
204 static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
205 {
206         return d->emd_md_dev.md_lu_dev.ld_obd;
207 }
208 #endif /* HAVE_SERVER_SUPPORT */
209
210 /** @} echo_helpers */
211
212 static int cl_echo_object_put(struct echo_object *eco);
213 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
214                               struct page **pages, int npages, int async);
215
216 struct echo_thread_info {
217         struct echo_object_conf eti_conf;
218         struct lustre_md        eti_md;
219
220         struct cl_2queue        eti_queue;
221         struct cl_io            eti_io;
222         struct cl_lock          eti_lock;
223         struct lu_fid           eti_fid;
224         struct lu_fid           eti_fid2;
225 #ifdef HAVE_SERVER_SUPPORT
226         struct md_op_spec       eti_spec;
227         struct lov_mds_md_v3    eti_lmm;
228         struct lov_user_md_v3   eti_lum;
229         struct md_attr          eti_ma;
230         struct lu_name          eti_lname;
231         /* per-thread values, can be re-used */
232         void                    *eti_big_lmm; /* may be vmalloc'd */
233         int                     eti_big_lmmsize;
234         char                    eti_name[ETI_NAME_LEN];
235         struct lu_buf           eti_buf;
236         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
237 #endif
238 };
239
240 /* No session used right now */
241 struct echo_session_info {
242         unsigned long dummy;
243 };
244
245 static struct kmem_cache *echo_lock_kmem;
246 static struct kmem_cache *echo_object_kmem;
247 static struct kmem_cache *echo_thread_kmem;
248 static struct kmem_cache *echo_session_kmem;
249 /* static struct kmem_cache *echo_req_kmem; */
250
251 static struct lu_kmem_descr echo_caches[] = {
252         {
253                 .ckd_cache = &echo_lock_kmem,
254                 .ckd_name  = "echo_lock_kmem",
255                 .ckd_size  = sizeof (struct echo_lock)
256         },
257         {
258                 .ckd_cache = &echo_object_kmem,
259                 .ckd_name  = "echo_object_kmem",
260                 .ckd_size  = sizeof (struct echo_object)
261         },
262         {
263                 .ckd_cache = &echo_thread_kmem,
264                 .ckd_name  = "echo_thread_kmem",
265                 .ckd_size  = sizeof (struct echo_thread_info)
266         },
267         {
268                 .ckd_cache = &echo_session_kmem,
269                 .ckd_name  = "echo_session_kmem",
270                 .ckd_size  = sizeof (struct echo_session_info)
271         },
272         {
273                 .ckd_cache = NULL
274         }
275 };
276
277 /** \defgroup echo_page Page operations
278  *
279  * Echo page operations.
280  *
281  * @{
282  */
283 static int echo_page_own(const struct lu_env *env,
284                          const struct cl_page_slice *slice,
285                          struct cl_io *io, int nonblock)
286 {
287         struct echo_page *ep = cl2echo_page(slice);
288
289         if (!nonblock)
290                 mutex_lock(&ep->ep_lock);
291         else if (!mutex_trylock(&ep->ep_lock))
292                 return -EAGAIN;
293         return 0;
294 }
295
296 static void echo_page_disown(const struct lu_env *env,
297                              const struct cl_page_slice *slice,
298                              struct cl_io *io)
299 {
300         struct echo_page *ep = cl2echo_page(slice);
301
302         LASSERT(mutex_is_locked(&ep->ep_lock));
303         mutex_unlock(&ep->ep_lock);
304 }
305
306 static void echo_page_discard(const struct lu_env *env,
307                               const struct cl_page_slice *slice,
308                               struct cl_io *unused)
309 {
310         cl_page_delete(env, slice->cpl_page);
311 }
312
313 static int echo_page_is_vmlocked(const struct lu_env *env,
314                                  const struct cl_page_slice *slice)
315 {
316         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
317                 return -EBUSY;
318         return -ENODATA;
319 }
320
321 static void echo_page_completion(const struct lu_env *env,
322                                  const struct cl_page_slice *slice,
323                                  int ioret)
324 {
325         LASSERT(slice->cpl_page->cp_sync_io != NULL);
326 }
327
328 static void echo_page_fini(const struct lu_env *env,
329                            struct cl_page_slice *slice)
330 {
331         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
332         ENTRY;
333
334         atomic_dec(&eco->eo_npages);
335         put_page(slice->cpl_page->cp_vmpage);
336         EXIT;
337 }
338
339 static int echo_page_prep(const struct lu_env *env,
340                           const struct cl_page_slice *slice,
341                           struct cl_io *unused)
342 {
343         return 0;
344 }
345
346 static int echo_page_print(const struct lu_env *env,
347                            const struct cl_page_slice *slice,
348                            void *cookie, lu_printer_t printer)
349 {
350         struct echo_page *ep = cl2echo_page(slice);
351
352         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
353                    ep, mutex_is_locked(&ep->ep_lock),
354                    slice->cpl_page->cp_vmpage);
355         return 0;
356 }
357
358 static const struct cl_page_operations echo_page_ops = {
359         .cpo_own           = echo_page_own,
360         .cpo_disown        = echo_page_disown,
361         .cpo_discard       = echo_page_discard,
362         .cpo_fini          = echo_page_fini,
363         .cpo_print         = echo_page_print,
364         .cpo_is_vmlocked   = echo_page_is_vmlocked,
365         .io = {
366                 [CRT_READ] = {
367                         .cpo_prep        = echo_page_prep,
368                         .cpo_completion  = echo_page_completion,
369                 },
370                 [CRT_WRITE] = {
371                         .cpo_prep        = echo_page_prep,
372                         .cpo_completion  = echo_page_completion,
373                 }
374         }
375 };
376 /** @} echo_page */
377
378 /** \defgroup echo_lock Locking
379  *
380  * echo lock operations
381  *
382  * @{
383  */
384 static void echo_lock_fini(const struct lu_env *env,
385                            struct cl_lock_slice *slice)
386 {
387         struct echo_lock *ecl = cl2echo_lock(slice);
388
389         LASSERT(list_empty(&ecl->el_chain));
390         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
391 }
392
393 static struct cl_lock_operations echo_lock_ops = {
394         .clo_fini      = echo_lock_fini,
395 };
396
397 /** @} echo_lock */
398
399 /** \defgroup echo_cl_ops cl_object operations
400  *
401  * operations for cl_object
402  *
403  * @{
404  */
405 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
406                           struct cl_page *page, pgoff_t index)
407 {
408         struct echo_page *ep = cl_object_page_slice(obj, page);
409         struct echo_object *eco = cl2echo_obj(obj);
410         ENTRY;
411
412         get_page(page->cp_vmpage);
413         mutex_init(&ep->ep_lock);
414         cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
415         atomic_inc(&eco->eo_npages);
416         RETURN(0);
417 }
418
419 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
420                         struct cl_io *io)
421 {
422         return 0;
423 }
424
425 static int echo_lock_init(const struct lu_env *env,
426                           struct cl_object *obj, struct cl_lock *lock,
427                           const struct cl_io *unused)
428 {
429         struct echo_lock *el;
430         ENTRY;
431
432         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
433         if (el != NULL) {
434                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
435                 el->el_object = cl2echo_obj(obj);
436                 INIT_LIST_HEAD(&el->el_chain);
437                 atomic_set(&el->el_refcount, 0);
438         }
439         RETURN(el == NULL ? -ENOMEM : 0);
440 }
441
442 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
443                          const struct cl_object_conf *conf)
444 {
445         return 0;
446 }
447
448 static const struct cl_object_operations echo_cl_obj_ops = {
449         .coo_page_init = echo_page_init,
450         .coo_lock_init = echo_lock_init,
451         .coo_io_init   = echo_io_init,
452         .coo_conf_set  = echo_conf_set
453 };
454 /** @} echo_cl_ops */
455
456 /** \defgroup echo_lu_ops lu_object operations
457  *
458  * operations for echo lu object.
459  *
460  * @{
461  */
462 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
463                             const struct lu_object_conf *conf)
464 {
465         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
466         struct echo_client_obd *ec     = ed->ed_ec;
467         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
468         ENTRY;
469
470         if (ed->ed_next) {
471                 struct lu_object  *below;
472                 struct lu_device  *under;
473
474                 under = ed->ed_next;
475                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
476                                                         under);
477                 if (below == NULL)
478                         RETURN(-ENOMEM);
479                 lu_object_add(obj, below);
480         }
481
482         if (!ed->ed_next_ismd) {
483                 const struct cl_object_conf *cconf = lu2cl_conf(conf);
484                 struct echo_object_conf *econf = cl2echo_conf(cconf);
485
486                 LASSERT(econf->eoc_oinfo != NULL);
487
488                 /* Transfer the oinfo pointer to eco that it won't be
489                  * freed. */
490                 eco->eo_oinfo = *econf->eoc_oinfo;
491                 *econf->eoc_oinfo = NULL;
492         } else {
493                 eco->eo_oinfo = NULL;
494         }
495
496         eco->eo_dev = ed;
497         atomic_set(&eco->eo_npages, 0);
498         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
499
500         spin_lock(&ec->ec_lock);
501         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
502         spin_unlock(&ec->ec_lock);
503
504         RETURN(0);
505 }
506
507 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
508 {
509         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
510         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
511         ENTRY;
512
513         LASSERT(atomic_read(&eco->eo_npages) == 0);
514
515         spin_lock(&ec->ec_lock);
516         list_del_init(&eco->eo_obj_chain);
517         spin_unlock(&ec->ec_lock);
518
519         lu_object_fini(obj);
520         lu_object_header_fini(obj->lo_header);
521
522         if (eco->eo_oinfo != NULL)
523                 OBD_FREE_PTR(eco->eo_oinfo);
524
525         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
526         EXIT;
527 }
528
529 static int echo_object_print(const struct lu_env *env, void *cookie,
530                             lu_printer_t p, const struct lu_object *o)
531 {
532         struct echo_object *obj = cl2echo_obj(lu2cl(o));
533
534         return (*p)(env, cookie, "echoclient-object@%p", obj);
535 }
536
537 static const struct lu_object_operations echo_lu_obj_ops = {
538         .loo_object_init      = echo_object_init,
539         .loo_object_delete    = NULL,
540         .loo_object_release   = NULL,
541         .loo_object_free      = echo_object_free,
542         .loo_object_print     = echo_object_print,
543         .loo_object_invariant = NULL
544 };
545 /** @} echo_lu_ops */
546
547 /** \defgroup echo_lu_dev_ops  lu_device operations
548  *
549  * Operations for echo lu device.
550  *
551  * @{
552  */
553 static struct lu_object *echo_object_alloc(const struct lu_env *env,
554                                            const struct lu_object_header *hdr,
555                                            struct lu_device *dev)
556 {
557         struct echo_object *eco;
558         struct lu_object *obj = NULL;
559         ENTRY;
560
561         /* we're the top dev. */
562         LASSERT(hdr == NULL);
563         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
564         if (eco != NULL) {
565                 struct cl_object_header *hdr = &eco->eo_hdr;
566
567                 obj = &echo_obj2cl(eco)->co_lu;
568                 cl_object_header_init(hdr);
569                 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
570
571                 lu_object_init(obj, &hdr->coh_lu, dev);
572                 lu_object_add_top(&hdr->coh_lu, obj);
573
574                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
575                 obj->lo_ops       = &echo_lu_obj_ops;
576         }
577         RETURN(obj);
578 }
579
580 static struct lu_device_operations echo_device_lu_ops = {
581         .ldo_object_alloc   = echo_object_alloc,
582 };
583
584 /** @} echo_lu_dev_ops */
585
586 /** \defgroup echo_init Setup and teardown
587  *
588  * Init and fini functions for echo client.
589  *
590  * @{
591  */
592 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
593 {
594         struct cl_site *site = &ed->ed_site_myself;
595         int rc;
596
597         /* initialize site */
598         rc = cl_site_init(site, &ed->ed_cl);
599         if (rc) {
600                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
601                 return rc;
602         }
603
604         rc = lu_site_init_finish(&site->cs_lu);
605         if (rc) {
606                 cl_site_fini(site);
607                 return rc;
608         }
609
610         ed->ed_site = &site->cs_lu;
611         return 0;
612 }
613
614 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
615 {
616         if (ed->ed_site) {
617                 if (!ed->ed_next_ismd)
618                         lu_site_fini(ed->ed_site);
619                 ed->ed_site = NULL;
620         }
621 }
622
623 static void *echo_thread_key_init(const struct lu_context *ctx,
624                                   struct lu_context_key *key)
625 {
626         struct echo_thread_info *info;
627
628         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
629         if (info == NULL)
630                 info = ERR_PTR(-ENOMEM);
631         return info;
632 }
633
634 static void echo_thread_key_fini(const struct lu_context *ctx,
635                          struct lu_context_key *key, void *data)
636 {
637         struct echo_thread_info *info = data;
638         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
639 }
640
641 static struct lu_context_key echo_thread_key = {
642         .lct_tags = LCT_CL_THREAD,
643         .lct_init = echo_thread_key_init,
644         .lct_fini = echo_thread_key_fini,
645 };
646
647 static void *echo_session_key_init(const struct lu_context *ctx,
648                                   struct lu_context_key *key)
649 {
650         struct echo_session_info *session;
651
652         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
653         if (session == NULL)
654                 session = ERR_PTR(-ENOMEM);
655         return session;
656 }
657
658 static void echo_session_key_fini(const struct lu_context *ctx,
659                                  struct lu_context_key *key, void *data)
660 {
661         struct echo_session_info *session = data;
662         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
663 }
664
665 static struct lu_context_key echo_session_key = {
666         .lct_tags = LCT_SESSION,
667         .lct_init = echo_session_key_init,
668         .lct_fini = echo_session_key_fini,
669 };
670
671 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
672
673 #ifdef HAVE_SERVER_SUPPORT
674 # define ECHO_SEQ_WIDTH 0xffffffff
675 static int echo_fid_init(struct echo_device *ed, char *obd_name,
676                          struct seq_server_site *ss)
677 {
678         char *prefix;
679         int rc;
680         ENTRY;
681
682         OBD_ALLOC_PTR(ed->ed_cl_seq);
683         if (ed->ed_cl_seq == NULL)
684                 RETURN(-ENOMEM);
685
686         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
687         if (prefix == NULL)
688                 GOTO(out_free_seq, rc = -ENOMEM);
689
690         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
691
692         /* Init client side sequence-manager */
693         rc = seq_client_init(ed->ed_cl_seq, NULL,
694                              LUSTRE_SEQ_METADATA,
695                              prefix, ss->ss_server_seq);
696         ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
697         OBD_FREE(prefix, MAX_OBD_NAME + 5);
698         if (rc)
699                 GOTO(out_free_seq, rc);
700
701         RETURN(0);
702
703 out_free_seq:
704         OBD_FREE_PTR(ed->ed_cl_seq);
705         ed->ed_cl_seq = NULL;
706         RETURN(rc);
707 }
708
709 static int echo_fid_fini(struct obd_device *obddev)
710 {
711         struct echo_device *ed = obd2echo_dev(obddev);
712         ENTRY;
713
714         if (ed->ed_cl_seq != NULL) {
715                 seq_client_fini(ed->ed_cl_seq);
716                 OBD_FREE_PTR(ed->ed_cl_seq);
717                 ed->ed_cl_seq = NULL;
718         }
719
720         RETURN(0);
721 }
722
723 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
724 {
725         ENTRY;
726
727         if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
728                 local_oid_storage_fini(env, ed->ed_los);
729                 ed->ed_los = NULL;
730         }
731 }
732
733 static int
734 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
735                           struct local_oid_storage *los,
736                           const struct lu_fid *pfid, const char *name,
737                           __u32 mode, struct lu_fid *fid)
738 {
739         struct dt_object        *parent = NULL;
740         struct dt_object        *dto = NULL;
741         int                      rc = 0;
742         ENTRY;
743
744         LASSERT(!fid_is_zero(pfid));
745         parent = dt_locate(env, emd->emd_bottom, pfid);
746         if (unlikely(IS_ERR(parent)))
747                 RETURN(PTR_ERR(parent));
748
749         /* create local file with @fid */
750         dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
751                                                  parent, name, mode);
752         if (IS_ERR(dto))
753                 GOTO(out_put, rc = PTR_ERR(dto));
754
755         *fid = *lu_object_fid(&dto->do_lu);
756         /* since stack is not fully set up the local_storage uses own stack
757          * and we should drop its object from cache */
758         lu_object_put_nocache(env, &dto->do_lu);
759
760         EXIT;
761 out_put:
762         lu_object_put(env, &parent->do_lu);
763         RETURN(rc);
764 }
765
766 static int
767 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
768                  struct echo_device *ed)
769 {
770         struct lu_fid                    fid;
771         int                              rc = 0;
772         ENTRY;
773
774         /* Setup local dirs */
775         fid.f_seq = FID_SEQ_LOCAL_NAME;
776         fid.f_oid = 1;
777         fid.f_ver = 0;
778         rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
779         if (rc != 0)
780                 RETURN(rc);
781
782         lu_echo_root_fid(&fid);
783         if (echo_md_seq_site(emd)->ss_node_id == 0) {
784                 rc = echo_md_local_file_create(env, emd, ed->ed_los,
785                                                &emd->emd_local_root_fid,
786                                                echo_md_root_dir_name, S_IFDIR |
787                                                S_IRUGO | S_IWUSR | S_IXUGO,
788                                                &fid);
789                 if (rc != 0) {
790                         CERROR("%s: create md echo root fid failed: rc = %d\n",
791                                emd2obd_dev(emd)->obd_name, rc);
792                         GOTO(out_los, rc);
793                 }
794         }
795         ed->ed_root_fid = fid;
796
797         RETURN(0);
798 out_los:
799         echo_ed_los_fini(env, ed);
800
801         RETURN(rc);
802 }
803 #endif /* HAVE_SERVER_SUPPORT */
804
805 static struct lu_device *echo_device_alloc(const struct lu_env *env,
806                                            struct lu_device_type *t,
807                                            struct lustre_cfg *cfg)
808 {
809         struct lu_device   *next;
810         struct echo_device *ed;
811         struct cl_device   *cd;
812         struct obd_device  *obd = NULL; /* to keep compiler happy */
813         struct obd_device  *tgt;
814         const char *tgt_type_name;
815         int rc;
816         int cleanup = 0;
817         ENTRY;
818
819         OBD_ALLOC_PTR(ed);
820         if (ed == NULL)
821                 GOTO(out, rc = -ENOMEM);
822
823         cleanup = 1;
824         cd = &ed->ed_cl;
825         rc = cl_device_init(cd, t);
826         if (rc)
827                 GOTO(out, rc);
828
829         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
830
831         cleanup = 2;
832         obd = class_name2obd(lustre_cfg_string(cfg, 0));
833         LASSERT(obd != NULL);
834         LASSERT(env != NULL);
835
836         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
837         if (tgt == NULL) {
838                 CERROR("Can not find tgt device %s\n",
839                         lustre_cfg_string(cfg, 1));
840                 GOTO(out, rc = -ENODEV);
841         }
842
843         next = tgt->obd_lu_dev;
844
845         if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
846                 ed->ed_next_ismd = 1;
847         } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
848                    strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
849                 ed->ed_next_ismd = 0;
850                 rc = echo_site_init(env, ed);
851                 if (rc)
852                         GOTO(out, rc);
853         } else {
854                 GOTO(out, rc = -EINVAL);
855         }
856
857         cleanup = 3;
858
859         rc = echo_client_setup(env, obd, cfg);
860         if (rc)
861                 GOTO(out, rc);
862
863         ed->ed_ec = &obd->u.echo_client;
864         cleanup = 4;
865
866         if (ed->ed_next_ismd) {
867 #ifdef HAVE_SERVER_SUPPORT
868                 /* Suppose to connect to some Metadata layer */
869                 struct lu_site          *ls = NULL;
870                 struct lu_device        *ld = NULL;
871                 struct md_device        *md = NULL;
872                 struct echo_md_device   *emd = NULL;
873                 int                      found = 0;
874
875                 if (next == NULL) {
876                         CERROR("%s is not lu device type!\n",
877                                lustre_cfg_string(cfg, 1));
878                         GOTO(out, rc = -EINVAL);
879                 }
880
881                 tgt_type_name = lustre_cfg_string(cfg, 2);
882                 if (!tgt_type_name) {
883                         CERROR("%s no type name for echo %s setup\n",
884                                 lustre_cfg_string(cfg, 1),
885                                 tgt->obd_type->typ_name);
886                         GOTO(out, rc = -EINVAL);
887                 }
888
889                 ls = next->ld_site;
890
891                 spin_lock(&ls->ls_ld_lock);
892                 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
893                         if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
894                                 found = 1;
895                                 break;
896                         }
897                 }
898                 spin_unlock(&ls->ls_ld_lock);
899
900                 if (found == 0) {
901                         CERROR("%s is not lu device type!\n",
902                                lustre_cfg_string(cfg, 1));
903                         GOTO(out, rc = -EINVAL);
904                 }
905
906                 next = ld;
907                 /* For MD echo client, it will use the site in MDS stack */
908                 ed->ed_site = ls;
909                 ed->ed_cl.cd_lu_dev.ld_site = ls;
910                 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
911                 if (rc) {
912                         CERROR("echo fid init error %d\n", rc);
913                         GOTO(out, rc);
914                 }
915
916                 md = lu2md_dev(next);
917                 emd = lu2emd_dev(&md->md_lu_dev);
918                 rc = echo_md_root_get(env, emd, ed);
919                 if (rc != 0) {
920                         CERROR("%s: get root error: rc = %d\n",
921                                 emd2obd_dev(emd)->obd_name, rc);
922                         GOTO(out, rc);
923                 }
924 #else /* !HAVE_SERVER_SUPPORT */
925                 CERROR("Local operations are NOT supported on client side. "
926                        "Only remote operations are supported. Metadata client "
927                        "must be run on server side.\n");
928                 GOTO(out, rc = -EOPNOTSUPP);
929 #endif /* HAVE_SERVER_SUPPORT */
930         } else {
931                  /* if echo client is to be stacked upon ost device, the next is
932                   * NULL since ost is not a clio device so far */
933                 if (next != NULL && !lu_device_is_cl(next))
934                         next = NULL;
935
936                 tgt_type_name = tgt->obd_type->typ_name;
937                 if (next != NULL) {
938                         LASSERT(next != NULL);
939                         if (next->ld_site != NULL)
940                                 GOTO(out, rc = -EBUSY);
941
942                         next->ld_site = ed->ed_site;
943                         rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
944                                                      next->ld_type->ldt_name,
945                                                      NULL);
946                         if (rc)
947                                 GOTO(out, rc);
948                 } else
949                         LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
950         }
951
952         ed->ed_next = next;
953         RETURN(&cd->cd_lu_dev);
954 out:
955         switch(cleanup) {
956         case 4: {
957                 int rc2;
958                 rc2 = echo_client_cleanup(obd);
959                 if (rc2)
960                         CERROR("Cleanup obd device %s error(%d)\n",
961                                obd->obd_name, rc2);
962         }
963
964         case 3:
965                 echo_site_fini(env, ed);
966         case 2:
967                 cl_device_fini(&ed->ed_cl);
968         case 1:
969                 OBD_FREE_PTR(ed);
970         case 0:
971         default:
972                 break;
973         }
974         return(ERR_PTR(rc));
975 }
976
977 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
978                           const char *name, struct lu_device *next)
979 {
980         LBUG();
981         return 0;
982 }
983
984 static struct lu_device *echo_device_fini(const struct lu_env *env,
985                                           struct lu_device *d)
986 {
987         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
988         struct lu_device *next = ed->ed_next;
989
990         while (next && !ed->ed_next_ismd)
991                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
992         return NULL;
993 }
994
995 static void echo_lock_release(const struct lu_env *env,
996                               struct echo_lock *ecl,
997                               int still_used)
998 {
999         struct cl_lock *clk = echo_lock2cl(ecl);
1000
1001         cl_lock_release(env, clk);
1002 }
1003
1004 static struct lu_device *echo_device_free(const struct lu_env *env,
1005                                           struct lu_device *d)
1006 {
1007         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
1008         struct echo_client_obd *ec   = ed->ed_ec;
1009         struct echo_object     *eco;
1010         struct lu_device       *next = ed->ed_next;
1011
1012         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
1013                ed, next);
1014
1015         lu_site_purge(env, ed->ed_site, -1);
1016
1017         /* check if there are objects still alive.
1018          * It shouldn't have any object because lu_site_purge would cleanup
1019          * all of cached objects. Anyway, probably the echo device is being
1020          * parallelly accessed.
1021          */
1022         spin_lock(&ec->ec_lock);
1023         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
1024                 eco->eo_deleted = 1;
1025         spin_unlock(&ec->ec_lock);
1026
1027         /* purge again */
1028         lu_site_purge(env, ed->ed_site, -1);
1029
1030         CDEBUG(D_INFO,
1031                "Waiting for the reference of echo object to be dropped\n");
1032
1033         /* Wait for the last reference to be dropped. */
1034         spin_lock(&ec->ec_lock);
1035         while (!list_empty(&ec->ec_objects)) {
1036                 spin_unlock(&ec->ec_lock);
1037                 CERROR("echo_client still has objects at cleanup time, "
1038                        "wait for 1 second\n");
1039                 set_current_state(TASK_UNINTERRUPTIBLE);
1040                 schedule_timeout(cfs_time_seconds(1));
1041                 lu_site_purge(env, ed->ed_site, -1);
1042                 spin_lock(&ec->ec_lock);
1043         }
1044         spin_unlock(&ec->ec_lock);
1045
1046         LASSERT(list_empty(&ec->ec_locks));
1047
1048         CDEBUG(D_INFO, "No object exists, exiting...\n");
1049
1050         echo_client_cleanup(d->ld_obd);
1051 #ifdef HAVE_SERVER_SUPPORT
1052         echo_fid_fini(d->ld_obd);
1053         echo_ed_los_fini(env, ed);
1054 #endif
1055         while (next && !ed->ed_next_ismd)
1056                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1057
1058         LASSERT(ed->ed_site == d->ld_site);
1059         echo_site_fini(env, ed);
1060         cl_device_fini(&ed->ed_cl);
1061         OBD_FREE_PTR(ed);
1062
1063         cl_env_cache_purge(~0);
1064
1065         return NULL;
1066 }
1067
1068 static const struct lu_device_type_operations echo_device_type_ops = {
1069         .ldto_init = echo_type_init,
1070         .ldto_fini = echo_type_fini,
1071
1072         .ldto_start = echo_type_start,
1073         .ldto_stop  = echo_type_stop,
1074
1075         .ldto_device_alloc = echo_device_alloc,
1076         .ldto_device_free  = echo_device_free,
1077         .ldto_device_init  = echo_device_init,
1078         .ldto_device_fini  = echo_device_fini
1079 };
1080
1081 static struct lu_device_type echo_device_type = {
1082         .ldt_tags     = LU_DEVICE_CL,
1083         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
1084         .ldt_ops      = &echo_device_type_ops,
1085         .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1086 };
1087 /** @} echo_init */
1088
1089 /** \defgroup echo_exports Exported operations
1090  *
1091  * exporting functions to echo client
1092  *
1093  * @{
1094  */
1095
1096 /* Interfaces to echo client obd device */
1097 static struct echo_object *
1098 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
1099 {
1100         struct lu_env *env;
1101         struct echo_thread_info *info;
1102         struct echo_object_conf *conf;
1103         struct echo_object *eco;
1104         struct cl_object *obj;
1105         struct lov_oinfo *oinfo = NULL;
1106         struct lu_fid *fid;
1107         __u16  refcheck;
1108         int rc;
1109         ENTRY;
1110
1111         LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
1112         LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
1113
1114         /* Never return an object if the obd is to be freed. */
1115         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1116                 RETURN(ERR_PTR(-ENODEV));
1117
1118         env = cl_env_get(&refcheck);
1119         if (IS_ERR(env))
1120                 RETURN((void *)env);
1121
1122         info = echo_env_info(env);
1123         conf = &info->eti_conf;
1124         if (d->ed_next) {
1125                 OBD_ALLOC_PTR(oinfo);
1126                 if (oinfo == NULL)
1127                         GOTO(out, eco = ERR_PTR(-ENOMEM));
1128
1129                 oinfo->loi_oi = *oi;
1130                 conf->eoc_cl.u.coc_oinfo = oinfo;
1131         }
1132
1133         /* If echo_object_init() is successful then ownership of oinfo
1134          * is transferred to the object. */
1135         conf->eoc_oinfo = &oinfo;
1136
1137         fid = &info->eti_fid;
1138         rc = ostid_to_fid(fid, oi, 0);
1139         if (rc != 0)
1140                 GOTO(out, eco = ERR_PTR(rc));
1141
1142         /* In the function below, .hs_keycmp resolves to
1143          * lu_obj_hop_keycmp() */
1144         /* coverity[overrun-buffer-val] */
1145         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1146         if (IS_ERR(obj))
1147                 GOTO(out, eco = (void*)obj);
1148
1149         eco = cl2echo_obj(obj);
1150         if (eco->eo_deleted) {
1151                 cl_object_put(env, obj);
1152                 eco = ERR_PTR(-EAGAIN);
1153         }
1154
1155 out:
1156         if (oinfo != NULL)
1157                 OBD_FREE_PTR(oinfo);
1158
1159         cl_env_put(env, &refcheck);
1160         RETURN(eco);
1161 }
1162
1163 static int cl_echo_object_put(struct echo_object *eco)
1164 {
1165         struct lu_env *env;
1166         struct cl_object *obj = echo_obj2cl(eco);
1167         __u16  refcheck;
1168         ENTRY;
1169
1170         env = cl_env_get(&refcheck);
1171         if (IS_ERR(env))
1172                 RETURN(PTR_ERR(env));
1173
1174         /* an external function to kill an object? */
1175         if (eco->eo_deleted) {
1176                 struct lu_object_header *loh = obj->co_lu.lo_header;
1177                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1178                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1179         }
1180
1181         cl_object_put(env, obj);
1182         cl_env_put(env, &refcheck);
1183         RETURN(0);
1184 }
1185
1186 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1187                             u64 start, u64 end, int mode,
1188                             __u64 *cookie , __u32 enqflags)
1189 {
1190         struct cl_io *io;
1191         struct cl_lock *lck;
1192         struct cl_object *obj;
1193         struct cl_lock_descr *descr;
1194         struct echo_thread_info *info;
1195         int rc = -ENOMEM;
1196         ENTRY;
1197
1198         info = echo_env_info(env);
1199         io = &info->eti_io;
1200         lck = &info->eti_lock;
1201         obj = echo_obj2cl(eco);
1202
1203         memset(lck, 0, sizeof(*lck));
1204         descr = &lck->cll_descr;
1205         descr->cld_obj   = obj;
1206         descr->cld_start = cl_index(obj, start);
1207         descr->cld_end   = cl_index(obj, end);
1208         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1209         descr->cld_enq_flags = enqflags;
1210         io->ci_obj = obj;
1211
1212         rc = cl_lock_request(env, io, lck);
1213         if (rc == 0) {
1214                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1215                 struct echo_lock *el;
1216
1217                 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1218                 spin_lock(&ec->ec_lock);
1219                 if (list_empty(&el->el_chain)) {
1220                         list_add(&el->el_chain, &ec->ec_locks);
1221                         el->el_cookie = ++ec->ec_unique;
1222                 }
1223                 atomic_inc(&el->el_refcount);
1224                 *cookie = el->el_cookie;
1225                 spin_unlock(&ec->ec_lock);
1226         }
1227         RETURN(rc);
1228 }
1229
1230 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1231                            __u64 cookie)
1232 {
1233         struct echo_client_obd *ec = ed->ed_ec;
1234         struct echo_lock       *ecl = NULL;
1235         struct list_head        *el;
1236         int found = 0, still_used = 0;
1237         ENTRY;
1238
1239         LASSERT(ec != NULL);
1240         spin_lock(&ec->ec_lock);
1241         list_for_each(el, &ec->ec_locks) {
1242                 ecl = list_entry(el, struct echo_lock, el_chain);
1243                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1244                 found = (ecl->el_cookie == cookie);
1245                 if (found) {
1246                         if (atomic_dec_and_test(&ecl->el_refcount))
1247                                 list_del_init(&ecl->el_chain);
1248                         else
1249                                 still_used = 1;
1250                         break;
1251                 }
1252         }
1253         spin_unlock(&ec->ec_lock);
1254
1255         if (!found)
1256                 RETURN(-ENOENT);
1257
1258         echo_lock_release(env, ecl, still_used);
1259         RETURN(0);
1260 }
1261
1262 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1263                                 struct cl_page *page)
1264 {
1265         struct echo_thread_info *info;
1266         struct cl_2queue        *queue;
1267
1268         info = echo_env_info(env);
1269         LASSERT(io == &info->eti_io);
1270
1271         queue = &info->eti_queue;
1272         cl_page_list_add(&queue->c2_qout, page);
1273 }
1274
1275 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1276                               struct page **pages, int npages, int async)
1277 {
1278         struct lu_env           *env;
1279         struct echo_thread_info *info;
1280         struct cl_object        *obj = echo_obj2cl(eco);
1281         struct echo_device      *ed  = eco->eo_dev;
1282         struct cl_2queue        *queue;
1283         struct cl_io            *io;
1284         struct cl_page          *clp;
1285         struct lustre_handle    lh = { 0 };
1286         int page_size = cl_page_size(obj);
1287         int rc;
1288         int i;
1289         __u16 refcheck;
1290         ENTRY;
1291
1292         LASSERT((offset & ~PAGE_MASK) == 0);
1293         LASSERT(ed->ed_next != NULL);
1294         env = cl_env_get(&refcheck);
1295         if (IS_ERR(env))
1296                 RETURN(PTR_ERR(env));
1297
1298         info    = echo_env_info(env);
1299         io      = &info->eti_io;
1300         queue   = &info->eti_queue;
1301
1302         cl_2queue_init(queue);
1303
1304         io->ci_ignore_layout = 1;
1305         rc = cl_io_init(env, io, CIT_MISC, obj);
1306         if (rc < 0)
1307                 GOTO(out, rc);
1308         LASSERT(rc == 0);
1309
1310
1311         rc = cl_echo_enqueue0(env, eco, offset,
1312                               offset + npages * PAGE_SIZE - 1,
1313                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1314                               CEF_NEVER);
1315         if (rc < 0)
1316                 GOTO(error_lock, rc);
1317
1318         for (i = 0; i < npages; i++) {
1319                 LASSERT(pages[i]);
1320                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1321                                    pages[i], CPT_TRANSIENT);
1322                 if (IS_ERR(clp)) {
1323                         rc = PTR_ERR(clp);
1324                         break;
1325                 }
1326                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1327
1328                 rc = cl_page_own(env, io, clp);
1329                 if (rc) {
1330                         LASSERT(clp->cp_state == CPS_FREEING);
1331                         cl_page_put(env, clp);
1332                         break;
1333                 }
1334
1335                 cl_2queue_add(queue, clp);
1336
1337                 /* drop the reference count for cl_page_find, so that the page
1338                  * will be freed in cl_2queue_fini. */
1339                 cl_page_put(env, clp);
1340                 cl_page_clip(env, clp, 0, page_size);
1341
1342                 offset += page_size;
1343         }
1344
1345         if (rc == 0) {
1346                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1347
1348                 async = async && (typ == CRT_WRITE);
1349                 if (async)
1350                         rc = cl_io_commit_async(env, io, &queue->c2_qin,
1351                                                 0, PAGE_SIZE,
1352                                                 echo_commit_callback);
1353                 else
1354                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1355                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1356                        async ? "async" : "sync", rc);
1357         }
1358
1359         cl_echo_cancel0(env, ed, lh.cookie);
1360         EXIT;
1361 error_lock:
1362         cl_2queue_discard(env, io, queue);
1363         cl_2queue_disown(env, io, queue);
1364         cl_2queue_fini(env, queue);
1365         cl_io_fini(env, io);
1366 out:
1367         cl_env_put(env, &refcheck);
1368         return rc;
1369 }
1370 /** @} echo_exports */
1371
1372
1373 static u64 last_object_id;
1374
1375 #ifdef HAVE_SERVER_SUPPORT
1376 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1377                                       __u64 id)
1378 {
1379         snprintf(name, ETI_NAME_LEN, "%llu", id);
1380         lname->ln_name = name;
1381         lname->ln_namelen = strlen(name);
1382 }
1383
1384 /* similar to mdt_attr_get_complex */
1385 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1386                             struct md_attr *ma)
1387 {
1388         struct echo_thread_info *info = echo_env_info(env);
1389         int                      rc;
1390
1391         ENTRY;
1392
1393         LASSERT(ma->ma_lmm_size > 0);
1394
1395         rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1396         if (rc < 0)
1397                 RETURN(rc);
1398
1399         /* big_lmm may need to be grown */
1400         if (info->eti_big_lmmsize < rc) {
1401                 int size = size_roundup_power2(rc);
1402
1403                 if (info->eti_big_lmmsize > 0) {
1404                         /* free old buffer */
1405                         LASSERT(info->eti_big_lmm);
1406                         OBD_FREE_LARGE(info->eti_big_lmm,
1407                                        info->eti_big_lmmsize);
1408                         info->eti_big_lmm = NULL;
1409                         info->eti_big_lmmsize = 0;
1410                 }
1411
1412                 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1413                 if (info->eti_big_lmm == NULL)
1414                         RETURN(-ENOMEM);
1415                 info->eti_big_lmmsize = size;
1416         }
1417         LASSERT(info->eti_big_lmmsize >= rc);
1418
1419         info->eti_buf.lb_buf = info->eti_big_lmm;
1420         info->eti_buf.lb_len = info->eti_big_lmmsize;
1421         rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1422         if (rc < 0)
1423                 RETURN(rc);
1424
1425         ma->ma_valid |= MA_LOV;
1426         ma->ma_lmm = info->eti_big_lmm;
1427         ma->ma_lmm_size = rc;
1428
1429         RETURN(0);
1430 }
1431
1432 static int echo_attr_get_complex(const struct lu_env *env,
1433                                  struct md_object *next,
1434                                  struct md_attr *ma)
1435 {
1436         struct echo_thread_info *info = echo_env_info(env);
1437         struct lu_buf           *buf = &info->eti_buf;
1438         umode_t          mode = lu_object_attr(&next->mo_lu);
1439         int                      need = ma->ma_need;
1440         int                      rc = 0, rc2;
1441
1442         ENTRY;
1443
1444         ma->ma_valid = 0;
1445
1446         if (need & MA_INODE) {
1447                 ma->ma_need = MA_INODE;
1448                 rc = mo_attr_get(env, next, ma);
1449                 if (rc)
1450                         GOTO(out, rc);
1451                 ma->ma_valid |= MA_INODE;
1452         }
1453
1454         if (need & MA_LOV) {
1455                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1456                         LASSERT(ma->ma_lmm_size > 0);
1457                         buf->lb_buf = ma->ma_lmm;
1458                         buf->lb_len = ma->ma_lmm_size;
1459                         rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1460                         if (rc2 > 0) {
1461                                 ma->ma_lmm_size = rc2;
1462                                 ma->ma_valid |= MA_LOV;
1463                         } else if (rc2 == -ENODATA) {
1464                                 /* no LOV EA */
1465                                 ma->ma_lmm_size = 0;
1466                         } else if (rc2 == -ERANGE) {
1467                                 rc2 = echo_big_lmm_get(env, next, ma);
1468                                 if (rc2 < 0)
1469                                         GOTO(out, rc = rc2);
1470                         } else {
1471                                 GOTO(out, rc = rc2);
1472                         }
1473                 }
1474         }
1475
1476 #ifdef CONFIG_FS_POSIX_ACL
1477         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1478                 buf->lb_buf = ma->ma_acl;
1479                 buf->lb_len = ma->ma_acl_size;
1480                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1481                 if (rc2 > 0) {
1482                         ma->ma_acl_size = rc2;
1483                         ma->ma_valid |= MA_ACL_DEF;
1484                 } else if (rc2 == -ENODATA) {
1485                         /* no ACLs */
1486                         ma->ma_acl_size = 0;
1487                 } else {
1488                         GOTO(out, rc = rc2);
1489                 }
1490         }
1491 #endif
1492 out:
1493         ma->ma_need = need;
1494         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1495                rc, ma->ma_valid, ma->ma_lmm);
1496         RETURN(rc);
1497 }
1498
1499 static int
1500 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1501                         struct md_object *parent, struct lu_fid *fid,
1502                         struct lu_name *lname, struct md_op_spec *spec,
1503                         struct md_attr *ma)
1504 {
1505         struct lu_object        *ec_child, *child;
1506         struct lu_device        *ld = ed->ed_next;
1507         struct echo_thread_info *info = echo_env_info(env);
1508         struct lu_fid           *fid2 = &info->eti_fid2;
1509         struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
1510         int                      rc;
1511
1512         ENTRY;
1513
1514         rc = mdo_lookup(env, parent, lname, fid2, spec);
1515         if (rc == 0)
1516                 return -EEXIST;
1517         else if (rc != -ENOENT)
1518                 return rc;
1519
1520         ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1521                                      fid, &conf);
1522         if (IS_ERR(ec_child)) {
1523                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1524                         PTR_ERR(ec_child));
1525                 RETURN(PTR_ERR(ec_child));
1526         }
1527
1528         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1529         if (child == NULL) {
1530                 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1531                 GOTO(out_put, rc = -EINVAL);
1532         }
1533
1534         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1535                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1536
1537         /*
1538          * Do not perform lookup sanity check. We know that name does not exist.
1539          */
1540         spec->sp_cr_lookup = 0;
1541         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1542         if (rc) {
1543                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1544                 GOTO(out_put, rc);
1545         }
1546         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
1547                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1548         EXIT;
1549 out_put:
1550         lu_object_put(env, ec_child);
1551         return rc;
1552 }
1553
1554 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1555                              struct md_attr *ma)
1556 {
1557         struct echo_thread_info *info = echo_env_info(env);
1558
1559         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1560                 ma->ma_lmm = (void *)&info->eti_lmm;
1561                 ma->ma_lmm_size = sizeof(info->eti_lmm);
1562         } else {
1563                 LASSERT(info->eti_big_lmmsize);
1564                 ma->ma_lmm = info->eti_big_lmm;
1565                 ma->ma_lmm_size = info->eti_big_lmmsize;
1566         }
1567
1568         return 0;
1569 }
1570
1571 static int echo_create_md_object(const struct lu_env *env,
1572                                  struct echo_device *ed,
1573                                  struct lu_object *ec_parent,
1574                                  struct lu_fid *fid,
1575                                  char *name, int namelen,
1576                                  __u64 id, __u32 mode, int count,
1577                                  int stripe_count, int stripe_offset)
1578 {
1579         struct lu_object        *parent;
1580         struct echo_thread_info *info = echo_env_info(env);
1581         struct lu_name          *lname = &info->eti_lname;
1582         struct md_op_spec       *spec = &info->eti_spec;
1583         struct md_attr          *ma = &info->eti_ma;
1584         struct lu_device        *ld = ed->ed_next;
1585         int                      rc = 0;
1586         int                      i;
1587
1588         ENTRY;
1589
1590         if (ec_parent == NULL)
1591                 return -1;
1592         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1593         if (parent == NULL)
1594                 RETURN(-ENXIO);
1595
1596         memset(ma, 0, sizeof(*ma));
1597         memset(spec, 0, sizeof(*spec));
1598         if (stripe_count != 0) {
1599                 spec->sp_cr_flags |= FMODE_WRITE;
1600                 echo_set_lmm_size(env, ld, ma);
1601                 if (stripe_count != -1) {
1602                         struct lov_user_md_v3 *lum = &info->eti_lum;
1603
1604                         lum->lmm_magic = LOV_USER_MAGIC_V3;
1605                         lum->lmm_stripe_count = stripe_count;
1606                         lum->lmm_stripe_offset = stripe_offset;
1607                         lum->lmm_pattern = 0;
1608                         spec->u.sp_ea.eadata = lum;
1609                         spec->u.sp_ea.eadatalen = sizeof(*lum);
1610                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1611                 }
1612         }
1613
1614         ma->ma_attr.la_mode = mode;
1615         ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1616         ma->ma_attr.la_ctime = cfs_time_current_64();
1617
1618         if (name != NULL) {
1619                 lname->ln_name = name;
1620                 lname->ln_namelen = namelen;
1621                 /* If name is specified, only create one object by name */
1622                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1623                                              spec, ma);
1624                 RETURN(rc);
1625         }
1626
1627         /* Create multiple object sequenced by id */
1628         for (i = 0; i < count; i++) {
1629                 char *tmp_name = info->eti_name;
1630
1631                 echo_md_build_name(lname, tmp_name, id);
1632
1633                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1634                                              spec, ma);
1635                 if (rc) {
1636                         CERROR("Can not create child %s: rc = %d\n", tmp_name,
1637                                 rc);
1638                         break;
1639                 }
1640                 id++;
1641                 fid->f_oid++;
1642         }
1643
1644         RETURN(rc);
1645 }
1646
1647 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1648                                         struct echo_device *ed,
1649                                         struct md_object *parent,
1650                                         struct lu_name *lname)
1651 {
1652         struct echo_thread_info *info = echo_env_info(env);
1653         struct lu_fid           *fid = &info->eti_fid;
1654         struct lu_object        *child;
1655         int    rc;
1656         ENTRY;
1657
1658         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1659                PFID(fid), parent);
1660         rc = mdo_lookup(env, parent, lname, fid, NULL);
1661         if (rc) {
1662                 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1663                 RETURN(ERR_PTR(rc));
1664         }
1665
1666         /* In the function below, .hs_keycmp resolves to
1667          * lu_obj_hop_keycmp() */
1668         /* coverity[overrun-buffer-val] */
1669         child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1670
1671         RETURN(child);
1672 }
1673
1674 static int echo_setattr_object(const struct lu_env *env,
1675                                struct echo_device *ed,
1676                                struct lu_object *ec_parent,
1677                                __u64 id, int count)
1678 {
1679         struct lu_object        *parent;
1680         struct echo_thread_info *info = echo_env_info(env);
1681         struct lu_name          *lname = &info->eti_lname;
1682         char                    *name = info->eti_name;
1683         struct lu_device        *ld = ed->ed_next;
1684         struct lu_buf           *buf = &info->eti_buf;
1685         int                      rc = 0;
1686         int                      i;
1687
1688         ENTRY;
1689
1690         if (ec_parent == NULL)
1691                 return -1;
1692         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1693         if (parent == NULL)
1694                 RETURN(-ENXIO);
1695
1696         for (i = 0; i < count; i++) {
1697                 struct lu_object *ec_child, *child;
1698
1699                 echo_md_build_name(lname, name, id);
1700
1701                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1702                 if (IS_ERR(ec_child)) {
1703                         CERROR("Can't find child %s: rc = %ld\n",
1704                                 lname->ln_name, PTR_ERR(ec_child));
1705                         RETURN(PTR_ERR(ec_child));
1706                 }
1707
1708                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1709                 if (child == NULL) {
1710                         CERROR("Can not locate the child %s\n", lname->ln_name);
1711                         lu_object_put(env, ec_child);
1712                         rc = -EINVAL;
1713                         break;
1714                 }
1715
1716                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1717                        PFID(lu_object_fid(child)));
1718
1719                 buf->lb_buf = info->eti_xattr_buf;
1720                 buf->lb_len = sizeof(info->eti_xattr_buf);
1721
1722                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1723                 rc = mo_xattr_set(env, lu2md(child), buf, name,
1724                                   LU_XATTR_CREATE);
1725                 if (rc < 0) {
1726                         CERROR("Can not setattr child "DFID": rc = %d\n",
1727                                 PFID(lu_object_fid(child)), rc);
1728                         lu_object_put(env, ec_child);
1729                         break;
1730                 }
1731                 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1732                        PFID(lu_object_fid(child)));
1733                 id++;
1734                 lu_object_put(env, ec_child);
1735         }
1736         RETURN(rc);
1737 }
1738
1739 static int echo_getattr_object(const struct lu_env *env,
1740                                struct echo_device *ed,
1741                                struct lu_object *ec_parent,
1742                                __u64 id, int count)
1743 {
1744         struct lu_object        *parent;
1745         struct echo_thread_info *info = echo_env_info(env);
1746         struct lu_name          *lname = &info->eti_lname;
1747         char                    *name = info->eti_name;
1748         struct md_attr          *ma = &info->eti_ma;
1749         struct lu_device        *ld = ed->ed_next;
1750         int                      rc = 0;
1751         int                      i;
1752
1753         ENTRY;
1754
1755         if (ec_parent == NULL)
1756                 return -1;
1757         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1758         if (parent == NULL)
1759                 RETURN(-ENXIO);
1760
1761         memset(ma, 0, sizeof(*ma));
1762         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1763         ma->ma_acl = info->eti_xattr_buf;
1764         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1765
1766         for (i = 0; i < count; i++) {
1767                 struct lu_object *ec_child, *child;
1768
1769                 ma->ma_valid = 0;
1770                 echo_md_build_name(lname, name, id);
1771                 echo_set_lmm_size(env, ld, ma);
1772
1773                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1774                 if (IS_ERR(ec_child)) {
1775                         CERROR("Can't find child %s: rc = %ld\n",
1776                                lname->ln_name, PTR_ERR(ec_child));
1777                         RETURN(PTR_ERR(ec_child));
1778                 }
1779
1780                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1781                 if (child == NULL) {
1782                         CERROR("Can not locate the child %s\n", lname->ln_name);
1783                         lu_object_put(env, ec_child);
1784                         RETURN(-EINVAL);
1785                 }
1786
1787                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1788                        PFID(lu_object_fid(child)));
1789                 rc = echo_attr_get_complex(env, lu2md(child), ma);
1790                 if (rc) {
1791                         CERROR("Can not getattr child "DFID": rc = %d\n",
1792                                 PFID(lu_object_fid(child)), rc);
1793                         lu_object_put(env, ec_child);
1794                         break;
1795                 }
1796                 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1797                        PFID(lu_object_fid(child)));
1798                 id++;
1799                 lu_object_put(env, ec_child);
1800         }
1801
1802         RETURN(rc);
1803 }
1804
1805 static int echo_lookup_object(const struct lu_env *env,
1806                               struct echo_device *ed,
1807                               struct lu_object *ec_parent,
1808                               __u64 id, int count)
1809 {
1810         struct lu_object        *parent;
1811         struct echo_thread_info *info = echo_env_info(env);
1812         struct lu_name          *lname = &info->eti_lname;
1813         char                    *name = info->eti_name;
1814         struct lu_fid           *fid = &info->eti_fid;
1815         struct lu_device        *ld = ed->ed_next;
1816         int                      rc = 0;
1817         int                      i;
1818
1819         if (ec_parent == NULL)
1820                 return -1;
1821         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1822         if (parent == NULL)
1823                 return -ENXIO;
1824
1825         /*prepare the requests*/
1826         for (i = 0; i < count; i++) {
1827                 echo_md_build_name(lname, name, id);
1828
1829                 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1830                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1831
1832                 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1833                 if (rc) {
1834                         CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1835                         break;
1836                 }
1837                 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1838                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1839
1840                 id++;
1841         }
1842         return rc;
1843 }
1844
1845 static int echo_md_destroy_internal(const struct lu_env *env,
1846                                     struct echo_device *ed,
1847                                     struct md_object *parent,
1848                                     struct lu_name *lname,
1849                                     struct md_attr *ma)
1850 {
1851         struct lu_device   *ld = ed->ed_next;
1852         struct lu_object   *ec_child;
1853         struct lu_object   *child;
1854         int                 rc;
1855
1856         ENTRY;
1857
1858         ec_child = echo_md_lookup(env, ed, parent, lname);
1859         if (IS_ERR(ec_child)) {
1860                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1861                         PTR_ERR(ec_child));
1862                 RETURN(PTR_ERR(ec_child));
1863         }
1864
1865         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1866         if (child == NULL) {
1867                 CERROR("Can not locate the child %s\n", lname->ln_name);
1868                 GOTO(out_put, rc = -EINVAL);
1869         }
1870
1871         if (lu_object_remote(child)) {
1872                 CERROR("Can not destroy remote object %s: rc = %d\n",
1873                        lname->ln_name, -EPERM);
1874                 GOTO(out_put, rc = -EPERM);
1875         }
1876         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1877                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1878
1879         rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1880         if (rc) {
1881                 CERROR("Can not unlink child %s: rc = %d\n",
1882                         lname->ln_name, rc);
1883                 GOTO(out_put, rc);
1884         }
1885         CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1886                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1887 out_put:
1888         lu_object_put(env, ec_child);
1889         return rc;
1890 }
1891
1892 static int echo_destroy_object(const struct lu_env *env,
1893                                struct echo_device *ed,
1894                                struct lu_object *ec_parent,
1895                                char *name, int namelen,
1896                                __u64 id, __u32 mode,
1897                                int count)
1898 {
1899         struct echo_thread_info *info = echo_env_info(env);
1900         struct lu_name          *lname = &info->eti_lname;
1901         struct md_attr          *ma = &info->eti_ma;
1902         struct lu_device        *ld = ed->ed_next;
1903         struct lu_object        *parent;
1904         int                      rc = 0;
1905         int                      i;
1906         ENTRY;
1907
1908         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1909         if (parent == NULL)
1910                 RETURN(-EINVAL);
1911
1912         memset(ma, 0, sizeof(*ma));
1913         ma->ma_attr.la_mode = mode;
1914         ma->ma_attr.la_valid = LA_CTIME;
1915         ma->ma_attr.la_ctime = cfs_time_current_64();
1916         ma->ma_need = MA_INODE;
1917         ma->ma_valid = 0;
1918
1919         if (name != NULL) {
1920                 lname->ln_name = name;
1921                 lname->ln_namelen = namelen;
1922                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1923                                               ma);
1924                 RETURN(rc);
1925         }
1926
1927         /*prepare the requests*/
1928         for (i = 0; i < count; i++) {
1929                 char *tmp_name = info->eti_name;
1930
1931                 ma->ma_valid = 0;
1932                 echo_md_build_name(lname, tmp_name, id);
1933
1934                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1935                                               ma);
1936                 if (rc) {
1937                         CERROR("Can not unlink child %s: rc = %d\n", name, rc);
1938                         break;
1939                 }
1940                 id++;
1941         }
1942
1943         RETURN(rc);
1944 }
1945
1946 static struct lu_object *echo_resolve_path(const struct lu_env *env,
1947                                            struct echo_device *ed, char *path,
1948                                            int path_len)
1949 {
1950         struct lu_device        *ld = ed->ed_next;
1951         struct echo_thread_info *info = echo_env_info(env);
1952         struct lu_fid           *fid = &info->eti_fid;
1953         struct lu_name          *lname = &info->eti_lname;
1954         struct lu_object        *parent = NULL;
1955         struct lu_object        *child = NULL;
1956         int                      rc = 0;
1957         ENTRY;
1958
1959         *fid = ed->ed_root_fid;
1960
1961         /* In the function below, .hs_keycmp resolves to
1962          * lu_obj_hop_keycmp() */
1963         /* coverity[overrun-buffer-val] */
1964         parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1965         if (IS_ERR(parent)) {
1966                 CERROR("Can not find the parent "DFID": rc = %ld\n",
1967                         PFID(fid), PTR_ERR(parent));
1968                 RETURN(parent);
1969         }
1970
1971         while (1) {
1972                 struct lu_object *ld_parent;
1973                 char *e;
1974
1975                 e = strsep(&path, "/");
1976                 if (e == NULL)
1977                         break;
1978
1979                 if (e[0] == 0) {
1980                         if (!path || path[0] == '\0')
1981                                 break;
1982                         continue;
1983                 }
1984
1985                 lname->ln_name = e;
1986                 lname->ln_namelen = strlen(e);
1987
1988                 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
1989                 if (ld_parent == NULL) {
1990                         lu_object_put(env, parent);
1991                         rc = -EINVAL;
1992                         break;
1993                 }
1994
1995                 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
1996                 lu_object_put(env, parent);
1997                 if (IS_ERR(child)) {
1998                         rc = (int)PTR_ERR(child);
1999                         CERROR("lookup %s under parent "DFID": rc = %d\n",
2000                                 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2001                                 rc);
2002                         break;
2003                 }
2004                 parent = child;
2005         }
2006         if (rc)
2007                 RETURN(ERR_PTR(rc));
2008
2009         RETURN(parent);
2010 }
2011
2012 static void echo_ucred_init(struct lu_env *env)
2013 {
2014         struct lu_ucred *ucred = lu_ucred(env);
2015
2016         ucred->uc_valid = UCRED_INVALID;
2017
2018         ucred->uc_suppgids[0] = -1;
2019         ucred->uc_suppgids[1] = -1;
2020
2021         ucred->uc_uid = ucred->uc_o_uid  =
2022                                 from_kuid(&init_user_ns, current_uid());
2023         ucred->uc_gid = ucred->uc_o_gid  =
2024                                 from_kgid(&init_user_ns, current_gid());
2025         ucred->uc_fsuid = ucred->uc_o_fsuid =
2026                                 from_kuid(&init_user_ns, current_fsuid());
2027         ucred->uc_fsgid = ucred->uc_o_fsgid =
2028                                 from_kgid(&init_user_ns, current_fsgid());
2029         ucred->uc_cap = cfs_curproc_cap_pack();
2030
2031         /* remove fs privilege for non-root user. */
2032         if (ucred->uc_fsuid)
2033                 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2034         ucred->uc_valid = UCRED_NEW;
2035 }
2036
2037 static void echo_ucred_fini(struct lu_env *env)
2038 {
2039         struct lu_ucred *ucred = lu_ucred(env);
2040         ucred->uc_valid = UCRED_INIT;
2041 }
2042
2043 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
2044 #define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
2045 static int echo_md_handler(struct echo_device *ed, int command,
2046                            char *path, int path_len, __u64 id, int count,
2047                            struct obd_ioctl_data *data)
2048 {
2049         struct echo_thread_info *info;
2050         struct lu_device      *ld = ed->ed_next;
2051         struct lu_env         *env;
2052         __u16                  refcheck;
2053         struct lu_object      *parent;
2054         char                  *name = NULL;
2055         int                    namelen = data->ioc_plen2;
2056         int                    rc = 0;
2057         ENTRY;
2058
2059         if (ld == NULL) {
2060                 CERROR("MD echo client is not being initialized properly\n");
2061                 RETURN(-EINVAL);
2062         }
2063
2064         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2065                 CERROR("Only support MDD layer right now!\n");
2066                 RETURN(-EINVAL);
2067         }
2068
2069         env = cl_env_get(&refcheck);
2070         if (IS_ERR(env))
2071                 RETURN(PTR_ERR(env));
2072
2073         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
2074         if (rc != 0)
2075                 GOTO(out_env, rc);
2076
2077         /* init big_lmm buffer */
2078         info = echo_env_info(env);
2079         LASSERT(info->eti_big_lmm == NULL);
2080         OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2081         if (info->eti_big_lmm == NULL)
2082                 GOTO(out_env, rc = -ENOMEM);
2083         info->eti_big_lmmsize = MIN_MD_SIZE;
2084
2085         parent = echo_resolve_path(env, ed, path, path_len);
2086         if (IS_ERR(parent)) {
2087                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2088                         PTR_ERR(parent));
2089                 GOTO(out_free, rc = PTR_ERR(parent));
2090         }
2091
2092         if (namelen > 0) {
2093                 OBD_ALLOC(name, namelen + 1);
2094                 if (name == NULL)
2095                         GOTO(out_put, rc = -ENOMEM);
2096                 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2097                         GOTO(out_name, rc = -EFAULT);
2098         }
2099
2100         echo_ucred_init(env);
2101
2102         switch (command) {
2103         case ECHO_MD_CREATE:
2104         case ECHO_MD_MKDIR: {
2105                 struct echo_thread_info *info = echo_env_info(env);
2106                 __u32 mode = data->ioc_obdo2.o_mode;
2107                 struct lu_fid *fid = &info->eti_fid;
2108                 int stripe_count = (int)data->ioc_obdo2.o_misc;
2109                 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2110
2111                 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2112                 if (rc != 0)
2113                         break;
2114
2115                 /* In the function below, .hs_keycmp resolves to
2116                  * lu_obj_hop_keycmp() */
2117                 /* coverity[overrun-buffer-val] */
2118                 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2119                                            id, mode, count, stripe_count,
2120                                            stripe_index);
2121                 break;
2122         }
2123         case ECHO_MD_DESTROY:
2124         case ECHO_MD_RMDIR: {
2125                 __u32 mode = data->ioc_obdo2.o_mode;
2126
2127                 rc = echo_destroy_object(env, ed, parent, name, namelen,
2128                                          id, mode, count);
2129                 break;
2130         }
2131         case ECHO_MD_LOOKUP:
2132                 rc = echo_lookup_object(env, ed, parent, id, count);
2133                 break;
2134         case ECHO_MD_GETATTR:
2135                 rc = echo_getattr_object(env, ed, parent, id, count);
2136                 break;
2137         case ECHO_MD_SETATTR:
2138                 rc = echo_setattr_object(env, ed, parent, id, count);
2139                 break;
2140         default:
2141                 CERROR("unknown command %d\n", command);
2142                 rc = -EINVAL;
2143                 break;
2144         }
2145         echo_ucred_fini(env);
2146
2147 out_name:
2148         if (name != NULL)
2149                 OBD_FREE(name, namelen + 1);
2150 out_put:
2151         lu_object_put(env, parent);
2152 out_free:
2153         LASSERT(info->eti_big_lmm);
2154         OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2155         info->eti_big_lmm = NULL;
2156         info->eti_big_lmmsize = 0;
2157 out_env:
2158         cl_env_put(env, &refcheck);
2159         return rc;
2160 }
2161 #endif /* HAVE_SERVER_SUPPORT */
2162
2163 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2164                               struct obdo *oa)
2165 {
2166         struct echo_object      *eco;
2167         struct echo_client_obd  *ec = ed->ed_ec;
2168         int created = 0;
2169         int rc;
2170         ENTRY;
2171
2172         if (!(oa->o_valid & OBD_MD_FLID) ||
2173             !(oa->o_valid & OBD_MD_FLGROUP) ||
2174             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
2175                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2176                 RETURN(-EINVAL);
2177         }
2178
2179         if (ostid_id(&oa->o_oi) == 0)
2180                 ostid_set_id(&oa->o_oi, ++last_object_id);
2181
2182         rc = obd_create(env, ec->ec_exp, oa);
2183         if (rc != 0) {
2184                 CERROR("Cannot create objects: rc = %d\n", rc);
2185                 GOTO(failed, rc);
2186         }
2187
2188         created = 1;
2189
2190         oa->o_valid |= OBD_MD_FLID;
2191
2192         eco = cl_echo_object_find(ed, &oa->o_oi);
2193         if (IS_ERR(eco))
2194                 GOTO(failed, rc = PTR_ERR(eco));
2195         cl_echo_object_put(eco);
2196
2197         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2198         EXIT;
2199
2200 failed:
2201         if (created && rc != 0)
2202                 obd_destroy(env, ec->ec_exp, oa);
2203
2204         if (rc != 0)
2205                 CERROR("create object failed with: rc = %d\n", rc);
2206
2207         return rc;
2208 }
2209
2210 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2211                            struct obdo *oa)
2212 {
2213         struct echo_object *eco;
2214         int rc;
2215         ENTRY;
2216
2217         if (!(oa->o_valid & OBD_MD_FLID) ||
2218             !(oa->o_valid & OBD_MD_FLGROUP) ||
2219             ostid_id(&oa->o_oi) == 0) {
2220                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2221                 RETURN(-EINVAL);
2222         }
2223
2224         rc = 0;
2225         eco = cl_echo_object_find(ed, &oa->o_oi);
2226         if (!IS_ERR(eco))
2227                 *ecop = eco;
2228         else
2229                 rc = PTR_ERR(eco);
2230
2231         RETURN(rc);
2232 }
2233
2234 static void echo_put_object(struct echo_object *eco)
2235 {
2236         int rc;
2237
2238         rc = cl_echo_object_put(eco);
2239         if (rc)
2240                 CERROR("%s: echo client drop an object failed: rc = %d\n",
2241                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2242 }
2243
2244 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2245                                          u64 offset, u64 count)
2246 {
2247         char    *addr;
2248         u64      stripe_off;
2249         u64      stripe_id;
2250         int      delta;
2251
2252         /* no partial pages on the client */
2253         LASSERT(count == PAGE_SIZE);
2254
2255         addr = kmap(page);
2256
2257         for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2258                 if (rw == OBD_BRW_WRITE) {
2259                         stripe_off = offset + delta;
2260                         stripe_id = id;
2261                 } else {
2262                         stripe_off = 0xdeadbeef00c0ffeeULL;
2263                         stripe_id = 0xdeadbeef00c0ffeeULL;
2264                 }
2265                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2266                                   stripe_off, stripe_id);
2267         }
2268
2269         kunmap(page);
2270 }
2271
2272 static int
2273 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2274 {
2275         u64      stripe_off;
2276         u64      stripe_id;
2277         char   *addr;
2278         int     delta;
2279         int     rc;
2280         int     rc2;
2281
2282         /* no partial pages on the client */
2283         LASSERT(count == PAGE_SIZE);
2284
2285         addr = kmap(page);
2286
2287         for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2288                 stripe_off = offset + delta;
2289                 stripe_id = id;
2290
2291                 rc2 = block_debug_check("test_brw",
2292                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
2293                                         stripe_off, stripe_id);
2294                 if (rc2 != 0) {
2295                         CERROR("Error in echo object %#llx\n", id);
2296                         rc = rc2;
2297                 }
2298         }
2299
2300         kunmap(page);
2301         return rc;
2302 }
2303
2304 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2305                             struct echo_object *eco, u64 offset,
2306                             u64 count, int async)
2307 {
2308         size_t                  npages;
2309         struct brw_page        *pga;
2310         struct brw_page        *pgp;
2311         struct page            **pages;
2312         u64                      off;
2313         size_t                  i;
2314         int                     rc;
2315         int                     verify;
2316         gfp_t                   gfp_mask;
2317         u32                     brw_flags = 0;
2318         ENTRY;
2319
2320         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2321                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2322                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2323
2324         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
2325
2326         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2327
2328         if ((count & (~PAGE_MASK)) != 0)
2329                 RETURN(-EINVAL);
2330
2331         /* XXX think again with misaligned I/O */
2332         npages = count >> PAGE_SHIFT;
2333
2334         if (rw == OBD_BRW_WRITE)
2335                 brw_flags = OBD_BRW_ASYNC;
2336
2337         OBD_ALLOC(pga, npages * sizeof(*pga));
2338         if (pga == NULL)
2339                 RETURN(-ENOMEM);
2340
2341         OBD_ALLOC(pages, npages * sizeof(*pages));
2342         if (pages == NULL) {
2343                 OBD_FREE(pga, npages * sizeof(*pga));
2344                 RETURN(-ENOMEM);
2345         }
2346
2347         for (i = 0, pgp = pga, off = offset;
2348              i < npages;
2349              i++, pgp++, off += PAGE_SIZE) {
2350
2351                 LASSERT(pgp->pg == NULL);       /* for cleanup */
2352
2353                 rc = -ENOMEM;
2354                 pgp->pg = alloc_page(gfp_mask);
2355                 if (pgp->pg == NULL)
2356                         goto out;
2357
2358                 pages[i] = pgp->pg;
2359                 pgp->count = PAGE_SIZE;
2360                 pgp->off = off;
2361                 pgp->flag = brw_flags;
2362
2363                 if (verify)
2364                         echo_client_page_debug_setup(pgp->pg, rw,
2365                                                      ostid_id(&oa->o_oi), off,
2366                                                      pgp->count);
2367         }
2368
2369         /* brw mode can only be used at client */
2370         LASSERT(ed->ed_next != NULL);
2371         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2372
2373  out:
2374         if (rc != 0 || rw != OBD_BRW_READ)
2375                 verify = 0;
2376
2377         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2378                 if (pgp->pg == NULL)
2379                         continue;
2380
2381                 if (verify) {
2382                         int vrc;
2383                         vrc = echo_client_page_debug_check(pgp->pg,
2384                                                            ostid_id(&oa->o_oi),
2385                                                            pgp->off, pgp->count);
2386                         if (vrc != 0 && rc == 0)
2387                                 rc = vrc;
2388                 }
2389                 __free_page(pgp->pg);
2390         }
2391         OBD_FREE(pga, npages * sizeof(*pga));
2392         OBD_FREE(pages, npages * sizeof(*pages));
2393         RETURN(rc);
2394 }
2395
2396 static int echo_client_prep_commit(const struct lu_env *env,
2397                                    struct obd_export *exp, int rw,
2398                                    struct obdo *oa, struct echo_object *eco,
2399                                    u64 offset, u64 count,
2400                                    u64 batch, int async)
2401 {
2402         struct obd_ioobj         ioo;
2403         struct niobuf_local     *lnb;
2404         struct niobuf_remote     rnb;
2405         u64                      off;
2406         u64                      npages, tot_pages, apc;
2407         int i, ret = 0, brw_flags = 0;
2408
2409         ENTRY;
2410
2411         if (count <= 0 || (count & ~PAGE_MASK) != 0)
2412                 RETURN(-EINVAL);
2413
2414         apc = npages = batch >> PAGE_SHIFT;
2415         tot_pages = count >> PAGE_SHIFT;
2416
2417         OBD_ALLOC(lnb, apc * sizeof(struct niobuf_local));
2418         if (lnb == NULL)
2419                 RETURN(-ENOMEM);
2420
2421         if (rw == OBD_BRW_WRITE && async)
2422                 brw_flags |= OBD_BRW_ASYNC;
2423
2424         obdo_to_ioobj(oa, &ioo);
2425
2426         off = offset;
2427
2428         for (; tot_pages > 0; tot_pages -= npages) {
2429                 int lpages;
2430
2431                 if (tot_pages < npages)
2432                         npages = tot_pages;
2433
2434                 rnb.rnb_offset = off;
2435                 rnb.rnb_len = npages * PAGE_SIZE;
2436                 rnb.rnb_flags = brw_flags;
2437                 ioo.ioo_bufcnt = 1;
2438                 off += npages * PAGE_SIZE;
2439
2440                 lpages = npages;
2441                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
2442                 if (ret != 0)
2443                         GOTO(out, ret);
2444
2445                 for (i = 0; i < lpages; i++) {
2446                         struct page *page = lnb[i].lnb_page;
2447
2448                         /* read past eof? */
2449                         if (page == NULL && lnb[i].lnb_rc == 0)
2450                                 continue;
2451
2452                         if (async)
2453                                 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2454
2455                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2456                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2457                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2458                                 continue;
2459
2460                         if (rw == OBD_BRW_WRITE)
2461                                 echo_client_page_debug_setup(page, rw,
2462                                                         ostid_id(&oa->o_oi),
2463                                                         lnb[i].lnb_file_offset,
2464                                                         lnb[i].lnb_len);
2465                         else
2466                                 echo_client_page_debug_check(page,
2467                                                         ostid_id(&oa->o_oi),
2468                                                         lnb[i].lnb_file_offset,
2469                                                         lnb[i].lnb_len);
2470                 }
2471
2472                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
2473                                    ret);
2474                 if (ret != 0)
2475                         break;
2476
2477                 /* Reuse env context. */
2478                 lu_context_exit((struct lu_context *)&env->le_ctx);
2479                 lu_context_enter((struct lu_context *)&env->le_ctx);
2480         }
2481
2482 out:
2483         OBD_FREE(lnb, apc * sizeof(struct niobuf_local));
2484
2485         RETURN(ret);
2486 }
2487
2488 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2489                                  struct obd_export *exp,
2490                                  struct obd_ioctl_data *data)
2491 {
2492         struct obd_device *obd = class_exp2obd(exp);
2493         struct echo_device *ed = obd2echo_dev(obd);
2494         struct echo_client_obd *ec = ed->ed_ec;
2495         struct obdo *oa = &data->ioc_obdo1;
2496         struct echo_object *eco;
2497         int rc;
2498         int async = 0;
2499         long test_mode;
2500         ENTRY;
2501
2502         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2503
2504         rc = echo_get_object(&eco, ed, oa);
2505         if (rc)
2506                 RETURN(rc);
2507
2508         oa->o_valid &= ~OBD_MD_FLHANDLE;
2509
2510         /* OFD/obdfilter works only via prep/commit */
2511         test_mode = (long)data->ioc_pbuf1;
2512         if (ed->ed_next == NULL && test_mode != 3) {
2513                 test_mode = 3;
2514                 data->ioc_plen1 = data->ioc_count;
2515         }
2516
2517         if (test_mode == 3)
2518                 async = 1;
2519
2520         /* Truncate batch size to maximum */
2521         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2522                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2523
2524         switch (test_mode) {
2525         case 1:
2526                 /* fall through */
2527         case 2:
2528                 rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
2529                                       data->ioc_count, async);
2530                 break;
2531         case 3:
2532                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
2533                                              data->ioc_offset, data->ioc_count,
2534                                              data->ioc_plen1, async);
2535                 break;
2536         default:
2537                 rc = -EINVAL;
2538         }
2539
2540         echo_put_object(eco);
2541
2542         RETURN(rc);
2543 }
2544
2545 static int
2546 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2547                       void *karg, void __user *uarg)
2548 {
2549 #ifdef HAVE_SERVER_SUPPORT
2550         struct tgt_session_info *tsi;
2551 #endif
2552         struct obd_device      *obd = exp->exp_obd;
2553         struct echo_device     *ed = obd2echo_dev(obd);
2554         struct echo_client_obd *ec = ed->ed_ec;
2555         struct echo_object     *eco;
2556         struct obd_ioctl_data  *data = karg;
2557         struct lu_env          *env;
2558         struct obdo            *oa;
2559         struct lu_fid           fid;
2560         int                     rw = OBD_BRW_READ;
2561         int                     rc = 0;
2562 #ifdef HAVE_SERVER_SUPPORT
2563         struct lu_context        echo_session;
2564 #endif
2565         ENTRY;
2566
2567         oa = &data->ioc_obdo1;
2568         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2569                 oa->o_valid |= OBD_MD_FLGROUP;
2570                 ostid_set_seq_echo(&oa->o_oi);
2571         }
2572
2573         /* This FID is unpacked just for validation at this point */
2574         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2575         if (rc < 0)
2576                 RETURN(rc);
2577
2578         OBD_ALLOC_PTR(env);
2579         if (env == NULL)
2580                 RETURN(-ENOMEM);
2581
2582         rc = lu_env_init(env, LCT_DT_THREAD);
2583         if (rc)
2584                 GOTO(out_alloc, rc = -ENOMEM);
2585
2586 #ifdef HAVE_SERVER_SUPPORT
2587         env->le_ses = &echo_session;
2588         rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
2589         if (unlikely(rc < 0))
2590                 GOTO(out_env, rc);
2591         lu_context_enter(env->le_ses);
2592
2593         tsi = tgt_ses_info(env);
2594         tsi->tsi_exp = ec->ec_exp;
2595         tsi->tsi_jobid = NULL;
2596 #endif
2597         switch (cmd) {
2598         case OBD_IOC_CREATE:                    /* may create echo object */
2599                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2600                         GOTO (out, rc = -EPERM);
2601
2602                 rc = echo_create_object(env, ed, oa);
2603                 GOTO(out, rc);
2604
2605 #ifdef HAVE_SERVER_SUPPORT
2606         case OBD_IOC_ECHO_MD: {
2607                 int count;
2608                 int cmd;
2609                 char *dir = NULL;
2610                 int dirlen;
2611                 __u64 id;
2612
2613                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2614                         GOTO(out, rc = -EPERM);
2615
2616                 count = data->ioc_count;
2617                 cmd = data->ioc_command;
2618
2619                 id = data->ioc_obdo2.o_oi.oi.oi_id;
2620                 dirlen = data->ioc_plen1;
2621                 OBD_ALLOC(dir, dirlen + 1);
2622                 if (dir == NULL)
2623                         GOTO(out, rc = -ENOMEM);
2624
2625                 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2626                         OBD_FREE(dir, data->ioc_plen1 + 1);
2627                         GOTO(out, rc = -EFAULT);
2628                 }
2629
2630                 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2631                 OBD_FREE(dir, dirlen + 1);
2632                 GOTO(out, rc);
2633         }
2634         case OBD_IOC_ECHO_ALLOC_SEQ: {
2635                 struct lu_env   *cl_env;
2636                 __u16            refcheck;
2637                 __u64            seq;
2638                 int              max_count;
2639
2640                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2641                         GOTO(out, rc = -EPERM);
2642
2643                 cl_env = cl_env_get(&refcheck);
2644                 if (IS_ERR(cl_env))
2645                         GOTO(out, rc = PTR_ERR(cl_env));
2646
2647                 rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
2648                                             ECHO_MD_SES_TAG);
2649                 if (rc != 0) {
2650                         cl_env_put(cl_env, &refcheck);
2651                         GOTO(out, rc);
2652                 }
2653
2654                 rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
2655                 cl_env_put(cl_env, &refcheck);
2656                 if (rc < 0) {
2657                         CERROR("%s: Can not alloc seq: rc = %d\n",
2658                                obd->obd_name, rc);
2659                         GOTO(out, rc);
2660                 }
2661
2662                 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2663                         return -EFAULT;
2664
2665                 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2666                 if (copy_to_user(data->ioc_pbuf2, &max_count,
2667                                      data->ioc_plen2))
2668                         return -EFAULT;
2669                 GOTO(out, rc);
2670         }
2671 #endif /* HAVE_SERVER_SUPPORT */
2672         case OBD_IOC_DESTROY:
2673                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2674                         GOTO (out, rc = -EPERM);
2675
2676                 rc = echo_get_object(&eco, ed, oa);
2677                 if (rc == 0) {
2678                         rc = obd_destroy(env, ec->ec_exp, oa);
2679                         if (rc == 0)
2680                                 eco->eo_deleted = 1;
2681                         echo_put_object(eco);
2682                 }
2683                 GOTO(out, rc);
2684
2685         case OBD_IOC_GETATTR:
2686                 rc = echo_get_object(&eco, ed, oa);
2687                 if (rc == 0) {
2688                         rc = obd_getattr(env, ec->ec_exp, oa);
2689                         echo_put_object(eco);
2690                 }
2691                 GOTO(out, rc);
2692
2693         case OBD_IOC_SETATTR:
2694                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2695                         GOTO (out, rc = -EPERM);
2696
2697                 rc = echo_get_object(&eco, ed, oa);
2698                 if (rc == 0) {
2699                         rc = obd_setattr(env, ec->ec_exp, oa);
2700                         echo_put_object(eco);
2701                 }
2702                 GOTO(out, rc);
2703
2704         case OBD_IOC_BRW_WRITE:
2705                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2706                         GOTO (out, rc = -EPERM);
2707
2708                 rw = OBD_BRW_WRITE;
2709                 /* fall through */
2710         case OBD_IOC_BRW_READ:
2711                 rc = echo_client_brw_ioctl(env, rw, exp, data);
2712                 GOTO(out, rc);
2713
2714         default:
2715                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2716                 GOTO (out, rc = -ENOTTY);
2717         }
2718
2719         EXIT;
2720 out:
2721 #ifdef HAVE_SERVER_SUPPORT
2722         lu_context_exit(env->le_ses);
2723         lu_context_fini(env->le_ses);
2724 out_env:
2725 #endif
2726         lu_env_fini(env);
2727 out_alloc:
2728         OBD_FREE_PTR(env);
2729
2730         return rc;
2731 }
2732
2733 static int echo_client_setup(const struct lu_env *env,
2734                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2735 {
2736         struct echo_client_obd *ec = &obddev->u.echo_client;
2737         struct obd_device *tgt;
2738         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2739         struct obd_connect_data *ocd = NULL;
2740         int rc;
2741         ENTRY;
2742
2743         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2744                 CERROR("requires a TARGET OBD name\n");
2745                 RETURN(-EINVAL);
2746         }
2747
2748         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2749         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2750                 CERROR("device not attached or not set up (%s)\n",
2751                        lustre_cfg_string(lcfg, 1));
2752                 RETURN(-EINVAL);
2753         }
2754
2755         spin_lock_init(&ec->ec_lock);
2756         INIT_LIST_HEAD(&ec->ec_objects);
2757         INIT_LIST_HEAD(&ec->ec_locks);
2758         ec->ec_unique = 0;
2759
2760         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2761 #ifdef HAVE_SERVER_SUPPORT
2762                 lu_context_tags_update(ECHO_MD_CTX_TAG);
2763                 lu_session_tags_update(ECHO_MD_SES_TAG);
2764 #else
2765                 CERROR("Local operations are NOT supported on client side. "
2766                        "Only remote operations are supported. Metadata client "
2767                        "must be run on server side.\n");
2768 #endif
2769                 RETURN(0);
2770         }
2771
2772         OBD_ALLOC(ocd, sizeof(*ocd));
2773         if (ocd == NULL) {
2774                 CERROR("Can't alloc ocd connecting to %s\n",
2775                        lustre_cfg_string(lcfg, 1));
2776                 return -ENOMEM;
2777         }
2778
2779         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2780                                  OBD_CONNECT_BRW_SIZE |
2781                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2782                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2783                                  OBD_CONNECT_FID;
2784         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2785         ocd->ocd_version = LUSTRE_VERSION_CODE;
2786         ocd->ocd_group = FID_SEQ_ECHO;
2787
2788         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2789         if (rc == 0) {
2790                 /* Turn off pinger because it connects to tgt obd directly. */
2791                 spin_lock(&tgt->obd_dev_lock);
2792                 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2793                 spin_unlock(&tgt->obd_dev_lock);
2794         }
2795
2796         OBD_FREE(ocd, sizeof(*ocd));
2797
2798         if (rc != 0) {
2799                 CERROR("fail to connect to device %s\n",
2800                        lustre_cfg_string(lcfg, 1));
2801                 return (rc);
2802         }
2803
2804         RETURN(rc);
2805 }
2806
2807 static int echo_client_cleanup(struct obd_device *obddev)
2808 {
2809         struct echo_device *ed = obd2echo_dev(obddev);
2810         struct echo_client_obd *ec = &obddev->u.echo_client;
2811         int rc;
2812         ENTRY;
2813
2814         /*Do nothing for Metadata echo client*/
2815         if (ed == NULL )
2816                 RETURN(0);
2817
2818         if (ed->ed_next_ismd) {
2819 #ifdef HAVE_SERVER_SUPPORT
2820                 lu_context_tags_clear(ECHO_MD_CTX_TAG);
2821                 lu_session_tags_clear(ECHO_MD_SES_TAG);
2822 #else
2823                 CERROR("This is client-side only module, does not support "
2824                         "metadata echo client.\n");
2825 #endif
2826                 RETURN(0);
2827         }
2828
2829         if (!list_empty(&obddev->obd_exports)) {
2830                 CERROR("still has clients!\n");
2831                 RETURN(-EBUSY);
2832         }
2833
2834         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2835         rc = obd_disconnect(ec->ec_exp);
2836         if (rc != 0)
2837                 CERROR("fail to disconnect device: %d\n", rc);
2838
2839         RETURN(rc);
2840 }
2841
2842 static int echo_client_connect(const struct lu_env *env,
2843                                struct obd_export **exp,
2844                                struct obd_device *src, struct obd_uuid *cluuid,
2845                                struct obd_connect_data *data, void *localdata)
2846 {
2847         int                rc;
2848         struct lustre_handle conn = { 0 };
2849
2850         ENTRY;
2851         rc = class_connect(&conn, src, cluuid);
2852         if (rc == 0) {
2853                 *exp = class_conn2export(&conn);
2854         }
2855
2856         RETURN (rc);
2857 }
2858
2859 static int echo_client_disconnect(struct obd_export *exp)
2860 {
2861         int                     rc;
2862         ENTRY;
2863
2864         if (exp == NULL)
2865                 GOTO(out, rc = -EINVAL);
2866
2867         rc = class_disconnect(exp);
2868         GOTO(out, rc);
2869  out:
2870         return rc;
2871 }
2872
2873 static struct obd_ops echo_client_obd_ops = {
2874         .o_owner       = THIS_MODULE,
2875         .o_iocontrol   = echo_client_iocontrol,
2876         .o_connect     = echo_client_connect,
2877         .o_disconnect  = echo_client_disconnect
2878 };
2879
2880 static int __init obdecho_init(void)
2881 {
2882         int rc;
2883
2884         ENTRY;
2885         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2886
2887         LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2888
2889 # ifdef HAVE_SERVER_SUPPORT
2890         rc = echo_persistent_pages_init();
2891         if (rc != 0)
2892                 goto failed_0;
2893
2894         rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
2895                                  LUSTRE_ECHO_NAME, NULL);
2896         if (rc != 0)
2897                 goto failed_1;
2898 # endif
2899
2900         rc = lu_kmem_init(echo_caches);
2901         if (rc == 0) {
2902                 rc = class_register_type(&echo_client_obd_ops, NULL, true, NULL,
2903                                          LUSTRE_ECHO_CLIENT_NAME,
2904                                          &echo_device_type);
2905                 if (rc)
2906                         lu_kmem_fini(echo_caches);
2907         }
2908
2909 # ifdef HAVE_SERVER_SUPPORT
2910         if (rc == 0)
2911                 RETURN(0);
2912
2913         class_unregister_type(LUSTRE_ECHO_NAME);
2914 failed_1:
2915         echo_persistent_pages_fini();
2916 failed_0:
2917 # endif
2918         RETURN(rc);
2919 }
2920
2921 static void __exit obdecho_exit(void)
2922 {
2923         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2924         lu_kmem_fini(echo_caches);
2925
2926 #ifdef HAVE_SERVER_SUPPORT
2927         class_unregister_type(LUSTRE_ECHO_NAME);
2928         echo_persistent_pages_fini();
2929 #endif
2930 }
2931
2932 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2933 MODULE_DESCRIPTION("Lustre Echo Client test driver");
2934 MODULE_VERSION(LUSTRE_VERSION_STRING);
2935 MODULE_LICENSE("GPL");
2936
2937 module_init(obdecho_init);
2938 module_exit(obdecho_exit);
2939
2940 /** @} echo_client */