Whamcloud - gitweb
LU-6245 client: remove types abstraction from client code
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38
39 #include <linux/user_namespace.h>
40 #ifdef HAVE_UIDGID_HEADER
41 # include <linux/uidgid.h>
42 #endif
43 #include <libcfs/libcfs.h>
44
45 #include <obd.h>
46 #include <obd_support.h>
47 #include <obd_class.h>
48 #include <lustre_debug.h>
49 #include <lprocfs_status.h>
50 #include <cl_object.h>
51 #include <lustre_fid.h>
52 #include <lustre_acl.h>
53 #include <lustre_ioctl.h>
54 #include <lustre_net.h>
55 #ifdef HAVE_SERVER_SUPPORT
56 # include <md_object.h>
57
58 #define ETI_NAME_LEN    20
59
60 #endif /* HAVE_SERVER_SUPPORT */
61
62 #include "echo_internal.h"
63
64 /** \defgroup echo_client Echo Client
65  * @{
66  */
67
68 struct echo_device {
69         struct cl_device          ed_cl;
70         struct echo_client_obd   *ed_ec;
71
72         struct cl_site            ed_site_myself;
73         struct lu_site           *ed_site;
74         struct lu_device         *ed_next;
75         int                       ed_next_ismd;
76         struct lu_client_seq     *ed_cl_seq;
77 #ifdef HAVE_SERVER_SUPPORT
78         struct local_oid_storage *ed_los;
79         struct lu_fid             ed_root_fid;
80 #endif /* HAVE_SERVER_SUPPORT */
81 };
82
83 struct echo_object {
84         struct cl_object        eo_cl;
85         struct cl_object_header eo_hdr;
86         struct echo_device     *eo_dev;
87         struct list_head        eo_obj_chain;
88         struct lov_oinfo       *eo_oinfo;
89         atomic_t                eo_npages;
90         int                     eo_deleted;
91 };
92
93 struct echo_object_conf {
94         struct cl_object_conf   eoc_cl;
95         struct lov_oinfo      **eoc_oinfo;
96 };
97
98 struct echo_page {
99         struct cl_page_slice    ep_cl;
100         struct mutex            ep_lock;
101 };
102
103 struct echo_lock {
104         struct cl_lock_slice    el_cl;
105         struct list_head        el_chain;
106         struct echo_object     *el_object;
107         __u64                   el_cookie;
108         atomic_t                el_refcount;
109 };
110
111 #ifdef HAVE_SERVER_SUPPORT
112 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
113
114 /**
115  * In order to use the values of members in struct mdd_device,
116  * we define an alias structure here.
117  */
118 struct echo_md_device {
119         struct md_device                 emd_md_dev;
120         struct obd_export               *emd_child_exp;
121         struct dt_device                *emd_child;
122         struct dt_device                *emd_bottom;
123         struct lu_fid                    emd_root_fid;
124         struct lu_fid                    emd_local_root_fid;
125 };
126 #endif /* HAVE_SERVER_SUPPORT */
127
128 static int echo_client_setup(const struct lu_env *env,
129                              struct obd_device *obddev,
130                              struct lustre_cfg *lcfg);
131 static int echo_client_cleanup(struct obd_device *obddev);
132
133
134 /** \defgroup echo_helpers Helper functions
135  * @{
136  */
137 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
138 {
139         return container_of0(dev, struct echo_device, ed_cl);
140 }
141
142 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
143 {
144         return &d->ed_cl;
145 }
146
147 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
148 {
149         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
150 }
151
152 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
153 {
154         return &eco->eo_cl;
155 }
156
157 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
158 {
159         return container_of(o, struct echo_object, eo_cl);
160 }
161
162 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
163 {
164         return container_of(s, struct echo_page, ep_cl);
165 }
166
167 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
168 {
169         return container_of(s, struct echo_lock, el_cl);
170 }
171
172 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
173 {
174         return ecl->el_cl.cls_lock;
175 }
176
177 static struct lu_context_key echo_thread_key;
178 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
179 {
180         struct echo_thread_info *info;
181         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
182         LASSERT(info != NULL);
183         return info;
184 }
185
186 static inline
187 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
188 {
189         return container_of(c, struct echo_object_conf, eoc_cl);
190 }
191
192 #ifdef HAVE_SERVER_SUPPORT
193 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
194 {
195         return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
196 }
197
198 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
199 {
200         return &d->emd_md_dev.md_lu_dev;
201 }
202
203 static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
204 {
205         return emd2lu_dev(d)->ld_site->ld_seq_site;
206 }
207
208 static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
209 {
210         return d->emd_md_dev.md_lu_dev.ld_obd;
211 }
212 #endif /* HAVE_SERVER_SUPPORT */
213
214 /** @} echo_helpers */
215
216 static int cl_echo_object_put(struct echo_object *eco);
217 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
218                               struct page **pages, int npages, int async);
219
220 struct echo_thread_info {
221         struct echo_object_conf eti_conf;
222         struct lustre_md        eti_md;
223
224         struct cl_2queue        eti_queue;
225         struct cl_io            eti_io;
226         struct cl_lock          eti_lock;
227         struct lu_fid           eti_fid;
228         struct lu_fid           eti_fid2;
229 #ifdef HAVE_SERVER_SUPPORT
230         struct md_op_spec       eti_spec;
231         struct lov_mds_md_v3    eti_lmm;
232         struct lov_user_md_v3   eti_lum;
233         struct md_attr          eti_ma;
234         struct lu_name          eti_lname;
235         /* per-thread values, can be re-used */
236         void                    *eti_big_lmm; /* may be vmalloc'd */
237         int                     eti_big_lmmsize;
238         char                    eti_name[ETI_NAME_LEN];
239         struct lu_buf           eti_buf;
240         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
241 #endif
242 };
243
244 /* No session used right now */
245 struct echo_session_info {
246         unsigned long dummy;
247 };
248
249 static struct kmem_cache *echo_lock_kmem;
250 static struct kmem_cache *echo_object_kmem;
251 static struct kmem_cache *echo_thread_kmem;
252 static struct kmem_cache *echo_session_kmem;
253 /* static struct kmem_cache *echo_req_kmem; */
254
255 static struct lu_kmem_descr echo_caches[] = {
256         {
257                 .ckd_cache = &echo_lock_kmem,
258                 .ckd_name  = "echo_lock_kmem",
259                 .ckd_size  = sizeof (struct echo_lock)
260         },
261         {
262                 .ckd_cache = &echo_object_kmem,
263                 .ckd_name  = "echo_object_kmem",
264                 .ckd_size  = sizeof (struct echo_object)
265         },
266         {
267                 .ckd_cache = &echo_thread_kmem,
268                 .ckd_name  = "echo_thread_kmem",
269                 .ckd_size  = sizeof (struct echo_thread_info)
270         },
271         {
272                 .ckd_cache = &echo_session_kmem,
273                 .ckd_name  = "echo_session_kmem",
274                 .ckd_size  = sizeof (struct echo_session_info)
275         },
276         {
277                 .ckd_cache = NULL
278         }
279 };
280
281 /** \defgroup echo_page Page operations
282  *
283  * Echo page operations.
284  *
285  * @{
286  */
287 static int echo_page_own(const struct lu_env *env,
288                          const struct cl_page_slice *slice,
289                          struct cl_io *io, int nonblock)
290 {
291         struct echo_page *ep = cl2echo_page(slice);
292
293         if (!nonblock)
294                 mutex_lock(&ep->ep_lock);
295         else if (!mutex_trylock(&ep->ep_lock))
296                 return -EAGAIN;
297         return 0;
298 }
299
300 static void echo_page_disown(const struct lu_env *env,
301                              const struct cl_page_slice *slice,
302                              struct cl_io *io)
303 {
304         struct echo_page *ep = cl2echo_page(slice);
305
306         LASSERT(mutex_is_locked(&ep->ep_lock));
307         mutex_unlock(&ep->ep_lock);
308 }
309
310 static void echo_page_discard(const struct lu_env *env,
311                               const struct cl_page_slice *slice,
312                               struct cl_io *unused)
313 {
314         cl_page_delete(env, slice->cpl_page);
315 }
316
317 static int echo_page_is_vmlocked(const struct lu_env *env,
318                                  const struct cl_page_slice *slice)
319 {
320         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
321                 return -EBUSY;
322         return -ENODATA;
323 }
324
325 static void echo_page_completion(const struct lu_env *env,
326                                  const struct cl_page_slice *slice,
327                                  int ioret)
328 {
329         LASSERT(slice->cpl_page->cp_sync_io != NULL);
330 }
331
332 static void echo_page_fini(const struct lu_env *env,
333                            struct cl_page_slice *slice)
334 {
335         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
336         ENTRY;
337
338         atomic_dec(&eco->eo_npages);
339         page_cache_release(slice->cpl_page->cp_vmpage);
340         EXIT;
341 }
342
343 static int echo_page_prep(const struct lu_env *env,
344                           const struct cl_page_slice *slice,
345                           struct cl_io *unused)
346 {
347         return 0;
348 }
349
350 static int echo_page_print(const struct lu_env *env,
351                            const struct cl_page_slice *slice,
352                            void *cookie, lu_printer_t printer)
353 {
354         struct echo_page *ep = cl2echo_page(slice);
355
356         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
357                    ep, mutex_is_locked(&ep->ep_lock),
358                    slice->cpl_page->cp_vmpage);
359         return 0;
360 }
361
362 static const struct cl_page_operations echo_page_ops = {
363         .cpo_own           = echo_page_own,
364         .cpo_disown        = echo_page_disown,
365         .cpo_discard       = echo_page_discard,
366         .cpo_fini          = echo_page_fini,
367         .cpo_print         = echo_page_print,
368         .cpo_is_vmlocked   = echo_page_is_vmlocked,
369         .io = {
370                 [CRT_READ] = {
371                         .cpo_prep        = echo_page_prep,
372                         .cpo_completion  = echo_page_completion,
373                 },
374                 [CRT_WRITE] = {
375                         .cpo_prep        = echo_page_prep,
376                         .cpo_completion  = echo_page_completion,
377                 }
378         }
379 };
380 /** @} echo_page */
381
382 /** \defgroup echo_lock Locking
383  *
384  * echo lock operations
385  *
386  * @{
387  */
388 static void echo_lock_fini(const struct lu_env *env,
389                            struct cl_lock_slice *slice)
390 {
391         struct echo_lock *ecl = cl2echo_lock(slice);
392
393         LASSERT(list_empty(&ecl->el_chain));
394         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
395 }
396
397 static struct cl_lock_operations echo_lock_ops = {
398         .clo_fini      = echo_lock_fini,
399 };
400
401 /** @} echo_lock */
402
403 /** \defgroup echo_cl_ops cl_object operations
404  *
405  * operations for cl_object
406  *
407  * @{
408  */
409 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
410                           struct cl_page *page, pgoff_t index)
411 {
412         struct echo_page *ep = cl_object_page_slice(obj, page);
413         struct echo_object *eco = cl2echo_obj(obj);
414         ENTRY;
415
416         page_cache_get(page->cp_vmpage);
417         mutex_init(&ep->ep_lock);
418         cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
419         atomic_inc(&eco->eo_npages);
420         RETURN(0);
421 }
422
423 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
424                         struct cl_io *io)
425 {
426         return 0;
427 }
428
429 static int echo_lock_init(const struct lu_env *env,
430                           struct cl_object *obj, struct cl_lock *lock,
431                           const struct cl_io *unused)
432 {
433         struct echo_lock *el;
434         ENTRY;
435
436         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
437         if (el != NULL) {
438                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
439                 el->el_object = cl2echo_obj(obj);
440                 INIT_LIST_HEAD(&el->el_chain);
441                 atomic_set(&el->el_refcount, 0);
442         }
443         RETURN(el == NULL ? -ENOMEM : 0);
444 }
445
446 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
447                          const struct cl_object_conf *conf)
448 {
449         return 0;
450 }
451
452 static const struct cl_object_operations echo_cl_obj_ops = {
453         .coo_page_init = echo_page_init,
454         .coo_lock_init = echo_lock_init,
455         .coo_io_init   = echo_io_init,
456         .coo_conf_set  = echo_conf_set
457 };
458 /** @} echo_cl_ops */
459
460 /** \defgroup echo_lu_ops lu_object operations
461  *
462  * operations for echo lu object.
463  *
464  * @{
465  */
466 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
467                             const struct lu_object_conf *conf)
468 {
469         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
470         struct echo_client_obd *ec     = ed->ed_ec;
471         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
472         ENTRY;
473
474         if (ed->ed_next) {
475                 struct lu_object  *below;
476                 struct lu_device  *under;
477
478                 under = ed->ed_next;
479                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
480                                                         under);
481                 if (below == NULL)
482                         RETURN(-ENOMEM);
483                 lu_object_add(obj, below);
484         }
485
486         if (!ed->ed_next_ismd) {
487                 const struct cl_object_conf *cconf = lu2cl_conf(conf);
488                 struct echo_object_conf *econf = cl2echo_conf(cconf);
489
490                 LASSERT(econf->eoc_oinfo != NULL);
491
492                 /* Transfer the oinfo pointer to eco that it won't be
493                  * freed. */
494                 eco->eo_oinfo = *econf->eoc_oinfo;
495                 *econf->eoc_oinfo = NULL;
496         } else {
497                 eco->eo_oinfo = NULL;
498         }
499
500         eco->eo_dev = ed;
501         atomic_set(&eco->eo_npages, 0);
502         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
503
504         spin_lock(&ec->ec_lock);
505         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
506         spin_unlock(&ec->ec_lock);
507
508         RETURN(0);
509 }
510
511 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
512 {
513         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
514         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
515         ENTRY;
516
517         LASSERT(atomic_read(&eco->eo_npages) == 0);
518
519         spin_lock(&ec->ec_lock);
520         list_del_init(&eco->eo_obj_chain);
521         spin_unlock(&ec->ec_lock);
522
523         lu_object_fini(obj);
524         lu_object_header_fini(obj->lo_header);
525
526         if (eco->eo_oinfo != NULL)
527                 OBD_FREE_PTR(eco->eo_oinfo);
528
529         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
530         EXIT;
531 }
532
533 static int echo_object_print(const struct lu_env *env, void *cookie,
534                             lu_printer_t p, const struct lu_object *o)
535 {
536         struct echo_object *obj = cl2echo_obj(lu2cl(o));
537
538         return (*p)(env, cookie, "echoclient-object@%p", obj);
539 }
540
541 static const struct lu_object_operations echo_lu_obj_ops = {
542         .loo_object_init      = echo_object_init,
543         .loo_object_delete    = NULL,
544         .loo_object_release   = NULL,
545         .loo_object_free      = echo_object_free,
546         .loo_object_print     = echo_object_print,
547         .loo_object_invariant = NULL
548 };
549 /** @} echo_lu_ops */
550
551 /** \defgroup echo_lu_dev_ops  lu_device operations
552  *
553  * Operations for echo lu device.
554  *
555  * @{
556  */
557 static struct lu_object *echo_object_alloc(const struct lu_env *env,
558                                            const struct lu_object_header *hdr,
559                                            struct lu_device *dev)
560 {
561         struct echo_object *eco;
562         struct lu_object *obj = NULL;
563         ENTRY;
564
565         /* we're the top dev. */
566         LASSERT(hdr == NULL);
567         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
568         if (eco != NULL) {
569                 struct cl_object_header *hdr = &eco->eo_hdr;
570
571                 obj = &echo_obj2cl(eco)->co_lu;
572                 cl_object_header_init(hdr);
573                 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
574
575                 lu_object_init(obj, &hdr->coh_lu, dev);
576                 lu_object_add_top(&hdr->coh_lu, obj);
577
578                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
579                 obj->lo_ops       = &echo_lu_obj_ops;
580         }
581         RETURN(obj);
582 }
583
584 static struct lu_device_operations echo_device_lu_ops = {
585         .ldo_object_alloc   = echo_object_alloc,
586 };
587
588 /** @} echo_lu_dev_ops */
589
590 /** \defgroup echo_init Setup and teardown
591  *
592  * Init and fini functions for echo client.
593  *
594  * @{
595  */
596 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
597 {
598         struct cl_site *site = &ed->ed_site_myself;
599         int rc;
600
601         /* initialize site */
602         rc = cl_site_init(site, &ed->ed_cl);
603         if (rc) {
604                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
605                 return rc;
606         }
607
608         rc = lu_site_init_finish(&site->cs_lu);
609         if (rc) {
610                 cl_site_fini(site);
611                 return rc;
612         }
613
614         ed->ed_site = &site->cs_lu;
615         return 0;
616 }
617
618 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
619 {
620         if (ed->ed_site) {
621                 if (!ed->ed_next_ismd)
622                         lu_site_fini(ed->ed_site);
623                 ed->ed_site = NULL;
624         }
625 }
626
627 static void *echo_thread_key_init(const struct lu_context *ctx,
628                                   struct lu_context_key *key)
629 {
630         struct echo_thread_info *info;
631
632         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
633         if (info == NULL)
634                 info = ERR_PTR(-ENOMEM);
635         return info;
636 }
637
638 static void echo_thread_key_fini(const struct lu_context *ctx,
639                          struct lu_context_key *key, void *data)
640 {
641         struct echo_thread_info *info = data;
642         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
643 }
644
645 static struct lu_context_key echo_thread_key = {
646         .lct_tags = LCT_CL_THREAD,
647         .lct_init = echo_thread_key_init,
648         .lct_fini = echo_thread_key_fini,
649 };
650
651 static void *echo_session_key_init(const struct lu_context *ctx,
652                                   struct lu_context_key *key)
653 {
654         struct echo_session_info *session;
655
656         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
657         if (session == NULL)
658                 session = ERR_PTR(-ENOMEM);
659         return session;
660 }
661
662 static void echo_session_key_fini(const struct lu_context *ctx,
663                                  struct lu_context_key *key, void *data)
664 {
665         struct echo_session_info *session = data;
666         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
667 }
668
669 static struct lu_context_key echo_session_key = {
670         .lct_tags = LCT_SESSION,
671         .lct_init = echo_session_key_init,
672         .lct_fini = echo_session_key_fini,
673 };
674
675 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
676
677 #ifdef HAVE_SERVER_SUPPORT
678 # define ECHO_SEQ_WIDTH 0xffffffff
679 static int echo_fid_init(struct echo_device *ed, char *obd_name,
680                          struct seq_server_site *ss)
681 {
682         char *prefix;
683         int rc;
684         ENTRY;
685
686         OBD_ALLOC_PTR(ed->ed_cl_seq);
687         if (ed->ed_cl_seq == NULL)
688                 RETURN(-ENOMEM);
689
690         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
691         if (prefix == NULL)
692                 GOTO(out_free_seq, rc = -ENOMEM);
693
694         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
695
696         /* Init client side sequence-manager */
697         rc = seq_client_init(ed->ed_cl_seq, NULL,
698                              LUSTRE_SEQ_METADATA,
699                              prefix, ss->ss_server_seq);
700         ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
701         OBD_FREE(prefix, MAX_OBD_NAME + 5);
702         if (rc)
703                 GOTO(out_free_seq, rc);
704
705         RETURN(0);
706
707 out_free_seq:
708         OBD_FREE_PTR(ed->ed_cl_seq);
709         ed->ed_cl_seq = NULL;
710         RETURN(rc);
711 }
712
713 static int echo_fid_fini(struct obd_device *obddev)
714 {
715         struct echo_device *ed = obd2echo_dev(obddev);
716         ENTRY;
717
718         if (ed->ed_cl_seq != NULL) {
719                 seq_client_fini(ed->ed_cl_seq);
720                 OBD_FREE_PTR(ed->ed_cl_seq);
721                 ed->ed_cl_seq = NULL;
722         }
723
724         RETURN(0);
725 }
726
727 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
728 {
729         ENTRY;
730
731         if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
732                 local_oid_storage_fini(env, ed->ed_los);
733                 ed->ed_los = NULL;
734         }
735 }
736
737 static int
738 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
739                           struct local_oid_storage *los,
740                           const struct lu_fid *pfid, const char *name,
741                           __u32 mode, struct lu_fid *fid)
742 {
743         struct dt_object        *parent = NULL;
744         struct dt_object        *dto = NULL;
745         int                      rc = 0;
746         ENTRY;
747
748         LASSERT(!fid_is_zero(pfid));
749         parent = dt_locate(env, emd->emd_bottom, pfid);
750         if (unlikely(IS_ERR(parent)))
751                 RETURN(PTR_ERR(parent));
752
753         /* create local file with @fid */
754         dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
755                                                  parent, name, mode);
756         if (IS_ERR(dto))
757                 GOTO(out_put, rc = PTR_ERR(dto));
758
759         *fid = *lu_object_fid(&dto->do_lu);
760         /* since stack is not fully set up the local_storage uses own stack
761          * and we should drop its object from cache */
762         lu_object_put_nocache(env, &dto->do_lu);
763
764         EXIT;
765 out_put:
766         lu_object_put(env, &parent->do_lu);
767         RETURN(rc);
768 }
769
770 static int
771 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
772                  struct echo_device *ed)
773 {
774         struct lu_fid                    fid;
775         int                              rc = 0;
776         ENTRY;
777
778         /* Setup local dirs */
779         fid.f_seq = FID_SEQ_LOCAL_NAME;
780         fid.f_oid = 1;
781         fid.f_ver = 0;
782         rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
783         if (rc != 0)
784                 RETURN(rc);
785
786         lu_echo_root_fid(&fid);
787         if (echo_md_seq_site(emd)->ss_node_id == 0) {
788                 rc = echo_md_local_file_create(env, emd, ed->ed_los,
789                                                &emd->emd_local_root_fid,
790                                                echo_md_root_dir_name, S_IFDIR |
791                                                S_IRUGO | S_IWUSR | S_IXUGO,
792                                                &fid);
793                 if (rc != 0) {
794                         CERROR("%s: create md echo root fid failed: rc = %d\n",
795                                emd2obd_dev(emd)->obd_name, rc);
796                         GOTO(out_los, rc);
797                 }
798         }
799         ed->ed_root_fid = fid;
800
801         RETURN(0);
802 out_los:
803         echo_ed_los_fini(env, ed);
804
805         RETURN(rc);
806 }
807 #endif /* HAVE_SERVER_SUPPORT */
808
809 static struct lu_device *echo_device_alloc(const struct lu_env *env,
810                                            struct lu_device_type *t,
811                                            struct lustre_cfg *cfg)
812 {
813         struct lu_device   *next;
814         struct echo_device *ed;
815         struct cl_device   *cd;
816         struct obd_device  *obd = NULL; /* to keep compiler happy */
817         struct obd_device  *tgt;
818         const char *tgt_type_name;
819         int rc;
820         int cleanup = 0;
821         ENTRY;
822
823         OBD_ALLOC_PTR(ed);
824         if (ed == NULL)
825                 GOTO(out, rc = -ENOMEM);
826
827         cleanup = 1;
828         cd = &ed->ed_cl;
829         rc = cl_device_init(cd, t);
830         if (rc)
831                 GOTO(out, rc);
832
833         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
834
835         cleanup = 2;
836         obd = class_name2obd(lustre_cfg_string(cfg, 0));
837         LASSERT(obd != NULL);
838         LASSERT(env != NULL);
839
840         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
841         if (tgt == NULL) {
842                 CERROR("Can not find tgt device %s\n",
843                         lustre_cfg_string(cfg, 1));
844                 GOTO(out, rc = -ENODEV);
845         }
846
847         next = tgt->obd_lu_dev;
848
849         if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
850                 ed->ed_next_ismd = 1;
851         } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
852                    strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
853                 ed->ed_next_ismd = 0;
854                 rc = echo_site_init(env, ed);
855                 if (rc)
856                         GOTO(out, rc);
857         } else {
858                 GOTO(out, rc = -EINVAL);
859         }
860
861         cleanup = 3;
862
863         rc = echo_client_setup(env, obd, cfg);
864         if (rc)
865                 GOTO(out, rc);
866
867         ed->ed_ec = &obd->u.echo_client;
868         cleanup = 4;
869
870         if (ed->ed_next_ismd) {
871 #ifdef HAVE_SERVER_SUPPORT
872                 /* Suppose to connect to some Metadata layer */
873                 struct lu_site          *ls = NULL;
874                 struct lu_device        *ld = NULL;
875                 struct md_device        *md = NULL;
876                 struct echo_md_device   *emd = NULL;
877                 int                      found = 0;
878
879                 if (next == NULL) {
880                         CERROR("%s is not lu device type!\n",
881                                lustre_cfg_string(cfg, 1));
882                         GOTO(out, rc = -EINVAL);
883                 }
884
885                 tgt_type_name = lustre_cfg_string(cfg, 2);
886                 if (!tgt_type_name) {
887                         CERROR("%s no type name for echo %s setup\n",
888                                 lustre_cfg_string(cfg, 1),
889                                 tgt->obd_type->typ_name);
890                         GOTO(out, rc = -EINVAL);
891                 }
892
893                 ls = next->ld_site;
894
895                 spin_lock(&ls->ls_ld_lock);
896                 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
897                         if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
898                                 found = 1;
899                                 break;
900                         }
901                 }
902                 spin_unlock(&ls->ls_ld_lock);
903
904                 if (found == 0) {
905                         CERROR("%s is not lu device type!\n",
906                                lustre_cfg_string(cfg, 1));
907                         GOTO(out, rc = -EINVAL);
908                 }
909
910                 next = ld;
911                 /* For MD echo client, it will use the site in MDS stack */
912                 ed->ed_site = ls;
913                 ed->ed_cl.cd_lu_dev.ld_site = ls;
914                 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
915                 if (rc) {
916                         CERROR("echo fid init error %d\n", rc);
917                         GOTO(out, rc);
918                 }
919
920                 md = lu2md_dev(next);
921                 emd = lu2emd_dev(&md->md_lu_dev);
922                 rc = echo_md_root_get(env, emd, ed);
923                 if (rc != 0) {
924                         CERROR("%s: get root error: rc = %d\n",
925                                 emd2obd_dev(emd)->obd_name, rc);
926                         GOTO(out, rc);
927                 }
928 #else /* !HAVE_SERVER_SUPPORT */
929                 CERROR("Local operations are NOT supported on client side. "
930                        "Only remote operations are supported. Metadata client "
931                        "must be run on server side.\n");
932                 GOTO(out, rc = -EOPNOTSUPP);
933 #endif /* HAVE_SERVER_SUPPORT */
934         } else {
935                  /* if echo client is to be stacked upon ost device, the next is
936                   * NULL since ost is not a clio device so far */
937                 if (next != NULL && !lu_device_is_cl(next))
938                         next = NULL;
939
940                 tgt_type_name = tgt->obd_type->typ_name;
941                 if (next != NULL) {
942                         LASSERT(next != NULL);
943                         if (next->ld_site != NULL)
944                                 GOTO(out, rc = -EBUSY);
945
946                         next->ld_site = ed->ed_site;
947                         rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
948                                                      next->ld_type->ldt_name,
949                                                      NULL);
950                         if (rc)
951                                 GOTO(out, rc);
952                 } else
953                         LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
954         }
955
956         ed->ed_next = next;
957         RETURN(&cd->cd_lu_dev);
958 out:
959         switch(cleanup) {
960         case 4: {
961                 int rc2;
962                 rc2 = echo_client_cleanup(obd);
963                 if (rc2)
964                         CERROR("Cleanup obd device %s error(%d)\n",
965                                obd->obd_name, rc2);
966         }
967
968         case 3:
969                 echo_site_fini(env, ed);
970         case 2:
971                 cl_device_fini(&ed->ed_cl);
972         case 1:
973                 OBD_FREE_PTR(ed);
974         case 0:
975         default:
976                 break;
977         }
978         return(ERR_PTR(rc));
979 }
980
981 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
982                           const char *name, struct lu_device *next)
983 {
984         LBUG();
985         return 0;
986 }
987
988 static struct lu_device *echo_device_fini(const struct lu_env *env,
989                                           struct lu_device *d)
990 {
991         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
992         struct lu_device *next = ed->ed_next;
993
994         while (next && !ed->ed_next_ismd)
995                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
996         return NULL;
997 }
998
999 static void echo_lock_release(const struct lu_env *env,
1000                               struct echo_lock *ecl,
1001                               int still_used)
1002 {
1003         struct cl_lock *clk = echo_lock2cl(ecl);
1004
1005         cl_lock_release(env, clk);
1006 }
1007
1008 static struct lu_device *echo_device_free(const struct lu_env *env,
1009                                           struct lu_device *d)
1010 {
1011         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
1012         struct echo_client_obd *ec   = ed->ed_ec;
1013         struct echo_object     *eco;
1014         struct lu_device       *next = ed->ed_next;
1015
1016         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
1017                ed, next);
1018
1019         lu_site_purge(env, ed->ed_site, -1);
1020
1021         /* check if there are objects still alive.
1022          * It shouldn't have any object because lu_site_purge would cleanup
1023          * all of cached objects. Anyway, probably the echo device is being
1024          * parallelly accessed.
1025          */
1026         spin_lock(&ec->ec_lock);
1027         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
1028                 eco->eo_deleted = 1;
1029         spin_unlock(&ec->ec_lock);
1030
1031         /* purge again */
1032         lu_site_purge(env, ed->ed_site, -1);
1033
1034         CDEBUG(D_INFO,
1035                "Waiting for the reference of echo object to be dropped\n");
1036
1037         /* Wait for the last reference to be dropped. */
1038         spin_lock(&ec->ec_lock);
1039         while (!list_empty(&ec->ec_objects)) {
1040                 spin_unlock(&ec->ec_lock);
1041                 CERROR("echo_client still has objects at cleanup time, "
1042                        "wait for 1 second\n");
1043                 set_current_state(TASK_UNINTERRUPTIBLE);
1044                 schedule_timeout(cfs_time_seconds(1));
1045                 lu_site_purge(env, ed->ed_site, -1);
1046                 spin_lock(&ec->ec_lock);
1047         }
1048         spin_unlock(&ec->ec_lock);
1049
1050         LASSERT(list_empty(&ec->ec_locks));
1051
1052         CDEBUG(D_INFO, "No object exists, exiting...\n");
1053
1054         echo_client_cleanup(d->ld_obd);
1055 #ifdef HAVE_SERVER_SUPPORT
1056         echo_fid_fini(d->ld_obd);
1057         echo_ed_los_fini(env, ed);
1058 #endif
1059         while (next && !ed->ed_next_ismd)
1060                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1061
1062         LASSERT(ed->ed_site == d->ld_site);
1063         echo_site_fini(env, ed);
1064         cl_device_fini(&ed->ed_cl);
1065         OBD_FREE_PTR(ed);
1066
1067         cl_env_cache_purge(~0);
1068
1069         return NULL;
1070 }
1071
1072 static const struct lu_device_type_operations echo_device_type_ops = {
1073         .ldto_init = echo_type_init,
1074         .ldto_fini = echo_type_fini,
1075
1076         .ldto_start = echo_type_start,
1077         .ldto_stop  = echo_type_stop,
1078
1079         .ldto_device_alloc = echo_device_alloc,
1080         .ldto_device_free  = echo_device_free,
1081         .ldto_device_init  = echo_device_init,
1082         .ldto_device_fini  = echo_device_fini
1083 };
1084
1085 static struct lu_device_type echo_device_type = {
1086         .ldt_tags     = LU_DEVICE_CL,
1087         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
1088         .ldt_ops      = &echo_device_type_ops,
1089         .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1090 };
1091 /** @} echo_init */
1092
1093 /** \defgroup echo_exports Exported operations
1094  *
1095  * exporting functions to echo client
1096  *
1097  * @{
1098  */
1099
1100 /* Interfaces to echo client obd device */
1101 static struct echo_object *
1102 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
1103 {
1104         struct lu_env *env;
1105         struct echo_thread_info *info;
1106         struct echo_object_conf *conf;
1107         struct echo_object *eco;
1108         struct cl_object *obj;
1109         struct lov_oinfo *oinfo = NULL;
1110         struct lu_fid *fid;
1111         __u16  refcheck;
1112         int rc;
1113         ENTRY;
1114
1115         LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
1116         LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
1117
1118         /* Never return an object if the obd is to be freed. */
1119         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1120                 RETURN(ERR_PTR(-ENODEV));
1121
1122         env = cl_env_get(&refcheck);
1123         if (IS_ERR(env))
1124                 RETURN((void *)env);
1125
1126         info = echo_env_info(env);
1127         conf = &info->eti_conf;
1128         if (d->ed_next) {
1129                 OBD_ALLOC_PTR(oinfo);
1130                 if (oinfo == NULL)
1131                         GOTO(out, eco = ERR_PTR(-ENOMEM));
1132
1133                 oinfo->loi_oi = *oi;
1134                 conf->eoc_cl.u.coc_oinfo = oinfo;
1135         }
1136
1137         /* If echo_object_init() is successful then ownership of oinfo
1138          * is transferred to the object. */
1139         conf->eoc_oinfo = &oinfo;
1140
1141         fid = &info->eti_fid;
1142         rc = ostid_to_fid(fid, oi, 0);
1143         if (rc != 0)
1144                 GOTO(out, eco = ERR_PTR(rc));
1145
1146         /* In the function below, .hs_keycmp resolves to
1147          * lu_obj_hop_keycmp() */
1148         /* coverity[overrun-buffer-val] */
1149         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1150         if (IS_ERR(obj))
1151                 GOTO(out, eco = (void*)obj);
1152
1153         eco = cl2echo_obj(obj);
1154         if (eco->eo_deleted) {
1155                 cl_object_put(env, obj);
1156                 eco = ERR_PTR(-EAGAIN);
1157         }
1158
1159 out:
1160         if (oinfo != NULL)
1161                 OBD_FREE_PTR(oinfo);
1162
1163         cl_env_put(env, &refcheck);
1164         RETURN(eco);
1165 }
1166
1167 static int cl_echo_object_put(struct echo_object *eco)
1168 {
1169         struct lu_env *env;
1170         struct cl_object *obj = echo_obj2cl(eco);
1171         __u16  refcheck;
1172         ENTRY;
1173
1174         env = cl_env_get(&refcheck);
1175         if (IS_ERR(env))
1176                 RETURN(PTR_ERR(env));
1177
1178         /* an external function to kill an object? */
1179         if (eco->eo_deleted) {
1180                 struct lu_object_header *loh = obj->co_lu.lo_header;
1181                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1182                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1183         }
1184
1185         cl_object_put(env, obj);
1186         cl_env_put(env, &refcheck);
1187         RETURN(0);
1188 }
1189
1190 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1191                             u64 start, u64 end, int mode,
1192                             __u64 *cookie , __u32 enqflags)
1193 {
1194         struct cl_io *io;
1195         struct cl_lock *lck;
1196         struct cl_object *obj;
1197         struct cl_lock_descr *descr;
1198         struct echo_thread_info *info;
1199         int rc = -ENOMEM;
1200         ENTRY;
1201
1202         info = echo_env_info(env);
1203         io = &info->eti_io;
1204         lck = &info->eti_lock;
1205         obj = echo_obj2cl(eco);
1206
1207         memset(lck, 0, sizeof(*lck));
1208         descr = &lck->cll_descr;
1209         descr->cld_obj   = obj;
1210         descr->cld_start = cl_index(obj, start);
1211         descr->cld_end   = cl_index(obj, end);
1212         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1213         descr->cld_enq_flags = enqflags;
1214         io->ci_obj = obj;
1215
1216         rc = cl_lock_request(env, io, lck);
1217         if (rc == 0) {
1218                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1219                 struct echo_lock *el;
1220
1221                 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1222                 spin_lock(&ec->ec_lock);
1223                 if (list_empty(&el->el_chain)) {
1224                         list_add(&el->el_chain, &ec->ec_locks);
1225                         el->el_cookie = ++ec->ec_unique;
1226                 }
1227                 atomic_inc(&el->el_refcount);
1228                 *cookie = el->el_cookie;
1229                 spin_unlock(&ec->ec_lock);
1230         }
1231         RETURN(rc);
1232 }
1233
1234 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1235                            __u64 cookie)
1236 {
1237         struct echo_client_obd *ec = ed->ed_ec;
1238         struct echo_lock       *ecl = NULL;
1239         struct list_head        *el;
1240         int found = 0, still_used = 0;
1241         ENTRY;
1242
1243         LASSERT(ec != NULL);
1244         spin_lock(&ec->ec_lock);
1245         list_for_each(el, &ec->ec_locks) {
1246                 ecl = list_entry(el, struct echo_lock, el_chain);
1247                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1248                 found = (ecl->el_cookie == cookie);
1249                 if (found) {
1250                         if (atomic_dec_and_test(&ecl->el_refcount))
1251                                 list_del_init(&ecl->el_chain);
1252                         else
1253                                 still_used = 1;
1254                         break;
1255                 }
1256         }
1257         spin_unlock(&ec->ec_lock);
1258
1259         if (!found)
1260                 RETURN(-ENOENT);
1261
1262         echo_lock_release(env, ecl, still_used);
1263         RETURN(0);
1264 }
1265
1266 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1267                                 struct cl_page *page)
1268 {
1269         struct echo_thread_info *info;
1270         struct cl_2queue        *queue;
1271
1272         info = echo_env_info(env);
1273         LASSERT(io == &info->eti_io);
1274
1275         queue = &info->eti_queue;
1276         cl_page_list_add(&queue->c2_qout, page);
1277 }
1278
1279 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1280                               struct page **pages, int npages, int async)
1281 {
1282         struct lu_env           *env;
1283         struct echo_thread_info *info;
1284         struct cl_object        *obj = echo_obj2cl(eco);
1285         struct echo_device      *ed  = eco->eo_dev;
1286         struct cl_2queue        *queue;
1287         struct cl_io            *io;
1288         struct cl_page          *clp;
1289         struct lustre_handle    lh = { 0 };
1290         int page_size = cl_page_size(obj);
1291         int rc;
1292         int i;
1293         __u16 refcheck;
1294         ENTRY;
1295
1296         LASSERT((offset & ~PAGE_MASK) == 0);
1297         LASSERT(ed->ed_next != NULL);
1298         env = cl_env_get(&refcheck);
1299         if (IS_ERR(env))
1300                 RETURN(PTR_ERR(env));
1301
1302         info    = echo_env_info(env);
1303         io      = &info->eti_io;
1304         queue   = &info->eti_queue;
1305
1306         cl_2queue_init(queue);
1307
1308         io->ci_ignore_layout = 1;
1309         rc = cl_io_init(env, io, CIT_MISC, obj);
1310         if (rc < 0)
1311                 GOTO(out, rc);
1312         LASSERT(rc == 0);
1313
1314
1315         rc = cl_echo_enqueue0(env, eco, offset,
1316                               offset + npages * PAGE_CACHE_SIZE - 1,
1317                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1318                               CEF_NEVER);
1319         if (rc < 0)
1320                 GOTO(error_lock, rc);
1321
1322         for (i = 0; i < npages; i++) {
1323                 LASSERT(pages[i]);
1324                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1325                                    pages[i], CPT_TRANSIENT);
1326                 if (IS_ERR(clp)) {
1327                         rc = PTR_ERR(clp);
1328                         break;
1329                 }
1330                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1331
1332                 rc = cl_page_own(env, io, clp);
1333                 if (rc) {
1334                         LASSERT(clp->cp_state == CPS_FREEING);
1335                         cl_page_put(env, clp);
1336                         break;
1337                 }
1338
1339                 cl_2queue_add(queue, clp);
1340
1341                 /* drop the reference count for cl_page_find, so that the page
1342                  * will be freed in cl_2queue_fini. */
1343                 cl_page_put(env, clp);
1344                 cl_page_clip(env, clp, 0, page_size);
1345
1346                 offset += page_size;
1347         }
1348
1349         if (rc == 0) {
1350                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1351
1352                 async = async && (typ == CRT_WRITE);
1353                 if (async)
1354                         rc = cl_io_commit_async(env, io, &queue->c2_qin,
1355                                                 0, PAGE_SIZE,
1356                                                 echo_commit_callback);
1357                 else
1358                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1359                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1360                        async ? "async" : "sync", rc);
1361         }
1362
1363         cl_echo_cancel0(env, ed, lh.cookie);
1364         EXIT;
1365 error_lock:
1366         cl_2queue_discard(env, io, queue);
1367         cl_2queue_disown(env, io, queue);
1368         cl_2queue_fini(env, queue);
1369         cl_io_fini(env, io);
1370 out:
1371         cl_env_put(env, &refcheck);
1372         return rc;
1373 }
1374 /** @} echo_exports */
1375
1376
1377 static u64 last_object_id;
1378
1379 #ifdef HAVE_SERVER_SUPPORT
1380 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1381                                       __u64 id)
1382 {
1383         snprintf(name, ETI_NAME_LEN, "%llu", id);
1384         lname->ln_name = name;
1385         lname->ln_namelen = strlen(name);
1386 }
1387
1388 /* similar to mdt_attr_get_complex */
1389 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1390                             struct md_attr *ma)
1391 {
1392         struct echo_thread_info *info = echo_env_info(env);
1393         int                      rc;
1394
1395         ENTRY;
1396
1397         LASSERT(ma->ma_lmm_size > 0);
1398
1399         rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1400         if (rc < 0)
1401                 RETURN(rc);
1402
1403         /* big_lmm may need to be grown */
1404         if (info->eti_big_lmmsize < rc) {
1405                 int size = size_roundup_power2(rc);
1406
1407                 if (info->eti_big_lmmsize > 0) {
1408                         /* free old buffer */
1409                         LASSERT(info->eti_big_lmm);
1410                         OBD_FREE_LARGE(info->eti_big_lmm,
1411                                        info->eti_big_lmmsize);
1412                         info->eti_big_lmm = NULL;
1413                         info->eti_big_lmmsize = 0;
1414                 }
1415
1416                 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1417                 if (info->eti_big_lmm == NULL)
1418                         RETURN(-ENOMEM);
1419                 info->eti_big_lmmsize = size;
1420         }
1421         LASSERT(info->eti_big_lmmsize >= rc);
1422
1423         info->eti_buf.lb_buf = info->eti_big_lmm;
1424         info->eti_buf.lb_len = info->eti_big_lmmsize;
1425         rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1426         if (rc < 0)
1427                 RETURN(rc);
1428
1429         ma->ma_valid |= MA_LOV;
1430         ma->ma_lmm = info->eti_big_lmm;
1431         ma->ma_lmm_size = rc;
1432
1433         RETURN(0);
1434 }
1435
1436 static int echo_attr_get_complex(const struct lu_env *env,
1437                                  struct md_object *next,
1438                                  struct md_attr *ma)
1439 {
1440         struct echo_thread_info *info = echo_env_info(env);
1441         struct lu_buf           *buf = &info->eti_buf;
1442         umode_t          mode = lu_object_attr(&next->mo_lu);
1443         int                      need = ma->ma_need;
1444         int                      rc = 0, rc2;
1445
1446         ENTRY;
1447
1448         ma->ma_valid = 0;
1449
1450         if (need & MA_INODE) {
1451                 ma->ma_need = MA_INODE;
1452                 rc = mo_attr_get(env, next, ma);
1453                 if (rc)
1454                         GOTO(out, rc);
1455                 ma->ma_valid |= MA_INODE;
1456         }
1457
1458         if (need & MA_LOV) {
1459                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1460                         LASSERT(ma->ma_lmm_size > 0);
1461                         buf->lb_buf = ma->ma_lmm;
1462                         buf->lb_len = ma->ma_lmm_size;
1463                         rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1464                         if (rc2 > 0) {
1465                                 ma->ma_lmm_size = rc2;
1466                                 ma->ma_valid |= MA_LOV;
1467                         } else if (rc2 == -ENODATA) {
1468                                 /* no LOV EA */
1469                                 ma->ma_lmm_size = 0;
1470                         } else if (rc2 == -ERANGE) {
1471                                 rc2 = echo_big_lmm_get(env, next, ma);
1472                                 if (rc2 < 0)
1473                                         GOTO(out, rc = rc2);
1474                         } else {
1475                                 GOTO(out, rc = rc2);
1476                         }
1477                 }
1478         }
1479
1480 #ifdef CONFIG_FS_POSIX_ACL
1481         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1482                 buf->lb_buf = ma->ma_acl;
1483                 buf->lb_len = ma->ma_acl_size;
1484                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1485                 if (rc2 > 0) {
1486                         ma->ma_acl_size = rc2;
1487                         ma->ma_valid |= MA_ACL_DEF;
1488                 } else if (rc2 == -ENODATA) {
1489                         /* no ACLs */
1490                         ma->ma_acl_size = 0;
1491                 } else {
1492                         GOTO(out, rc = rc2);
1493                 }
1494         }
1495 #endif
1496 out:
1497         ma->ma_need = need;
1498         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1499                rc, ma->ma_valid, ma->ma_lmm);
1500         RETURN(rc);
1501 }
1502
1503 static int
1504 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1505                         struct md_object *parent, struct lu_fid *fid,
1506                         struct lu_name *lname, struct md_op_spec *spec,
1507                         struct md_attr *ma)
1508 {
1509         struct lu_object        *ec_child, *child;
1510         struct lu_device        *ld = ed->ed_next;
1511         struct echo_thread_info *info = echo_env_info(env);
1512         struct lu_fid           *fid2 = &info->eti_fid2;
1513         struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
1514         int                      rc;
1515
1516         ENTRY;
1517
1518         rc = mdo_lookup(env, parent, lname, fid2, spec);
1519         if (rc == 0)
1520                 return -EEXIST;
1521         else if (rc != -ENOENT)
1522                 return rc;
1523
1524         ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1525                                      fid, &conf);
1526         if (IS_ERR(ec_child)) {
1527                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1528                         PTR_ERR(ec_child));
1529                 RETURN(PTR_ERR(ec_child));
1530         }
1531
1532         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1533         if (child == NULL) {
1534                 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1535                 GOTO(out_put, rc = -EINVAL);
1536         }
1537
1538         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1539                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1540
1541         /*
1542          * Do not perform lookup sanity check. We know that name does not exist.
1543          */
1544         spec->sp_cr_lookup = 0;
1545         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1546         if (rc) {
1547                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1548                 GOTO(out_put, rc);
1549         }
1550         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
1551                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1552         EXIT;
1553 out_put:
1554         lu_object_put(env, ec_child);
1555         return rc;
1556 }
1557
1558 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1559                              struct md_attr *ma)
1560 {
1561         struct echo_thread_info *info = echo_env_info(env);
1562
1563         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1564                 ma->ma_lmm = (void *)&info->eti_lmm;
1565                 ma->ma_lmm_size = sizeof(info->eti_lmm);
1566         } else {
1567                 LASSERT(info->eti_big_lmmsize);
1568                 ma->ma_lmm = info->eti_big_lmm;
1569                 ma->ma_lmm_size = info->eti_big_lmmsize;
1570         }
1571
1572         return 0;
1573 }
1574
1575 static int echo_create_md_object(const struct lu_env *env,
1576                                  struct echo_device *ed,
1577                                  struct lu_object *ec_parent,
1578                                  struct lu_fid *fid,
1579                                  char *name, int namelen,
1580                                  __u64 id, __u32 mode, int count,
1581                                  int stripe_count, int stripe_offset)
1582 {
1583         struct lu_object        *parent;
1584         struct echo_thread_info *info = echo_env_info(env);
1585         struct lu_name          *lname = &info->eti_lname;
1586         struct md_op_spec       *spec = &info->eti_spec;
1587         struct md_attr          *ma = &info->eti_ma;
1588         struct lu_device        *ld = ed->ed_next;
1589         int                      rc = 0;
1590         int                      i;
1591
1592         ENTRY;
1593
1594         if (ec_parent == NULL)
1595                 return -1;
1596         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1597         if (parent == NULL)
1598                 RETURN(-ENXIO);
1599
1600         memset(ma, 0, sizeof(*ma));
1601         memset(spec, 0, sizeof(*spec));
1602         if (stripe_count != 0) {
1603                 spec->sp_cr_flags |= FMODE_WRITE;
1604                 echo_set_lmm_size(env, ld, ma);
1605                 if (stripe_count != -1) {
1606                         struct lov_user_md_v3 *lum = &info->eti_lum;
1607
1608                         lum->lmm_magic = LOV_USER_MAGIC_V3;
1609                         lum->lmm_stripe_count = stripe_count;
1610                         lum->lmm_stripe_offset = stripe_offset;
1611                         lum->lmm_pattern = 0;
1612                         spec->u.sp_ea.eadata = lum;
1613                         spec->u.sp_ea.eadatalen = sizeof(*lum);
1614                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1615                 }
1616         }
1617
1618         ma->ma_attr.la_mode = mode;
1619         ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1620         ma->ma_attr.la_ctime = cfs_time_current_64();
1621
1622         if (name != NULL) {
1623                 lname->ln_name = name;
1624                 lname->ln_namelen = namelen;
1625                 /* If name is specified, only create one object by name */
1626                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1627                                              spec, ma);
1628                 RETURN(rc);
1629         }
1630
1631         /* Create multiple object sequenced by id */
1632         for (i = 0; i < count; i++) {
1633                 char *tmp_name = info->eti_name;
1634
1635                 echo_md_build_name(lname, tmp_name, id);
1636
1637                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1638                                              spec, ma);
1639                 if (rc) {
1640                         CERROR("Can not create child %s: rc = %d\n", tmp_name,
1641                                 rc);
1642                         break;
1643                 }
1644                 id++;
1645                 fid->f_oid++;
1646         }
1647
1648         RETURN(rc);
1649 }
1650
1651 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1652                                         struct echo_device *ed,
1653                                         struct md_object *parent,
1654                                         struct lu_name *lname)
1655 {
1656         struct echo_thread_info *info = echo_env_info(env);
1657         struct lu_fid           *fid = &info->eti_fid;
1658         struct lu_object        *child;
1659         int    rc;
1660         ENTRY;
1661
1662         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1663                PFID(fid), parent);
1664         rc = mdo_lookup(env, parent, lname, fid, NULL);
1665         if (rc) {
1666                 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1667                 RETURN(ERR_PTR(rc));
1668         }
1669
1670         /* In the function below, .hs_keycmp resolves to
1671          * lu_obj_hop_keycmp() */
1672         /* coverity[overrun-buffer-val] */
1673         child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1674
1675         RETURN(child);
1676 }
1677
1678 static int echo_setattr_object(const struct lu_env *env,
1679                                struct echo_device *ed,
1680                                struct lu_object *ec_parent,
1681                                __u64 id, int count)
1682 {
1683         struct lu_object        *parent;
1684         struct echo_thread_info *info = echo_env_info(env);
1685         struct lu_name          *lname = &info->eti_lname;
1686         char                    *name = info->eti_name;
1687         struct lu_device        *ld = ed->ed_next;
1688         struct lu_buf           *buf = &info->eti_buf;
1689         int                      rc = 0;
1690         int                      i;
1691
1692         ENTRY;
1693
1694         if (ec_parent == NULL)
1695                 return -1;
1696         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1697         if (parent == NULL)
1698                 RETURN(-ENXIO);
1699
1700         for (i = 0; i < count; i++) {
1701                 struct lu_object *ec_child, *child;
1702
1703                 echo_md_build_name(lname, name, id);
1704
1705                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1706                 if (IS_ERR(ec_child)) {
1707                         CERROR("Can't find child %s: rc = %ld\n",
1708                                 lname->ln_name, PTR_ERR(ec_child));
1709                         RETURN(PTR_ERR(ec_child));
1710                 }
1711
1712                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1713                 if (child == NULL) {
1714                         CERROR("Can not locate the child %s\n", lname->ln_name);
1715                         lu_object_put(env, ec_child);
1716                         rc = -EINVAL;
1717                         break;
1718                 }
1719
1720                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1721                        PFID(lu_object_fid(child)));
1722
1723                 buf->lb_buf = info->eti_xattr_buf;
1724                 buf->lb_len = sizeof(info->eti_xattr_buf);
1725
1726                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1727                 rc = mo_xattr_set(env, lu2md(child), buf, name,
1728                                   LU_XATTR_CREATE);
1729                 if (rc < 0) {
1730                         CERROR("Can not setattr child "DFID": rc = %d\n",
1731                                 PFID(lu_object_fid(child)), rc);
1732                         lu_object_put(env, ec_child);
1733                         break;
1734                 }
1735                 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1736                        PFID(lu_object_fid(child)));
1737                 id++;
1738                 lu_object_put(env, ec_child);
1739         }
1740         RETURN(rc);
1741 }
1742
1743 static int echo_getattr_object(const struct lu_env *env,
1744                                struct echo_device *ed,
1745                                struct lu_object *ec_parent,
1746                                __u64 id, int count)
1747 {
1748         struct lu_object        *parent;
1749         struct echo_thread_info *info = echo_env_info(env);
1750         struct lu_name          *lname = &info->eti_lname;
1751         char                    *name = info->eti_name;
1752         struct md_attr          *ma = &info->eti_ma;
1753         struct lu_device        *ld = ed->ed_next;
1754         int                      rc = 0;
1755         int                      i;
1756
1757         ENTRY;
1758
1759         if (ec_parent == NULL)
1760                 return -1;
1761         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1762         if (parent == NULL)
1763                 RETURN(-ENXIO);
1764
1765         memset(ma, 0, sizeof(*ma));
1766         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1767         ma->ma_acl = info->eti_xattr_buf;
1768         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1769
1770         for (i = 0; i < count; i++) {
1771                 struct lu_object *ec_child, *child;
1772
1773                 ma->ma_valid = 0;
1774                 echo_md_build_name(lname, name, id);
1775                 echo_set_lmm_size(env, ld, ma);
1776
1777                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1778                 if (IS_ERR(ec_child)) {
1779                         CERROR("Can't find child %s: rc = %ld\n",
1780                                lname->ln_name, PTR_ERR(ec_child));
1781                         RETURN(PTR_ERR(ec_child));
1782                 }
1783
1784                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1785                 if (child == NULL) {
1786                         CERROR("Can not locate the child %s\n", lname->ln_name);
1787                         lu_object_put(env, ec_child);
1788                         RETURN(-EINVAL);
1789                 }
1790
1791                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1792                        PFID(lu_object_fid(child)));
1793                 rc = echo_attr_get_complex(env, lu2md(child), ma);
1794                 if (rc) {
1795                         CERROR("Can not getattr child "DFID": rc = %d\n",
1796                                 PFID(lu_object_fid(child)), rc);
1797                         lu_object_put(env, ec_child);
1798                         break;
1799                 }
1800                 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1801                        PFID(lu_object_fid(child)));
1802                 id++;
1803                 lu_object_put(env, ec_child);
1804         }
1805
1806         RETURN(rc);
1807 }
1808
1809 static int echo_lookup_object(const struct lu_env *env,
1810                               struct echo_device *ed,
1811                               struct lu_object *ec_parent,
1812                               __u64 id, int count)
1813 {
1814         struct lu_object        *parent;
1815         struct echo_thread_info *info = echo_env_info(env);
1816         struct lu_name          *lname = &info->eti_lname;
1817         char                    *name = info->eti_name;
1818         struct lu_fid           *fid = &info->eti_fid;
1819         struct lu_device        *ld = ed->ed_next;
1820         int                      rc = 0;
1821         int                      i;
1822
1823         if (ec_parent == NULL)
1824                 return -1;
1825         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1826         if (parent == NULL)
1827                 return -ENXIO;
1828
1829         /*prepare the requests*/
1830         for (i = 0; i < count; i++) {
1831                 echo_md_build_name(lname, name, id);
1832
1833                 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1834                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1835
1836                 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1837                 if (rc) {
1838                         CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1839                         break;
1840                 }
1841                 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1842                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1843
1844                 id++;
1845         }
1846         return rc;
1847 }
1848
1849 static int echo_md_destroy_internal(const struct lu_env *env,
1850                                     struct echo_device *ed,
1851                                     struct md_object *parent,
1852                                     struct lu_name *lname,
1853                                     struct md_attr *ma)
1854 {
1855         struct lu_device   *ld = ed->ed_next;
1856         struct lu_object   *ec_child;
1857         struct lu_object   *child;
1858         int                 rc;
1859
1860         ENTRY;
1861
1862         ec_child = echo_md_lookup(env, ed, parent, lname);
1863         if (IS_ERR(ec_child)) {
1864                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1865                         PTR_ERR(ec_child));
1866                 RETURN(PTR_ERR(ec_child));
1867         }
1868
1869         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1870         if (child == NULL) {
1871                 CERROR("Can not locate the child %s\n", lname->ln_name);
1872                 GOTO(out_put, rc = -EINVAL);
1873         }
1874
1875         if (lu_object_remote(child)) {
1876                 CERROR("Can not destroy remote object %s: rc = %d\n",
1877                        lname->ln_name, -EPERM);
1878                 GOTO(out_put, rc = -EPERM);
1879         }
1880         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1881                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1882
1883         rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1884         if (rc) {
1885                 CERROR("Can not unlink child %s: rc = %d\n",
1886                         lname->ln_name, rc);
1887                 GOTO(out_put, rc);
1888         }
1889         CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1890                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1891 out_put:
1892         lu_object_put(env, ec_child);
1893         return rc;
1894 }
1895
1896 static int echo_destroy_object(const struct lu_env *env,
1897                                struct echo_device *ed,
1898                                struct lu_object *ec_parent,
1899                                char *name, int namelen,
1900                                __u64 id, __u32 mode,
1901                                int count)
1902 {
1903         struct echo_thread_info *info = echo_env_info(env);
1904         struct lu_name          *lname = &info->eti_lname;
1905         struct md_attr          *ma = &info->eti_ma;
1906         struct lu_device        *ld = ed->ed_next;
1907         struct lu_object        *parent;
1908         int                      rc = 0;
1909         int                      i;
1910         ENTRY;
1911
1912         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1913         if (parent == NULL)
1914                 RETURN(-EINVAL);
1915
1916         memset(ma, 0, sizeof(*ma));
1917         ma->ma_attr.la_mode = mode;
1918         ma->ma_attr.la_valid = LA_CTIME;
1919         ma->ma_attr.la_ctime = cfs_time_current_64();
1920         ma->ma_need = MA_INODE;
1921         ma->ma_valid = 0;
1922
1923         if (name != NULL) {
1924                 lname->ln_name = name;
1925                 lname->ln_namelen = namelen;
1926                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1927                                               ma);
1928                 RETURN(rc);
1929         }
1930
1931         /*prepare the requests*/
1932         for (i = 0; i < count; i++) {
1933                 char *tmp_name = info->eti_name;
1934
1935                 ma->ma_valid = 0;
1936                 echo_md_build_name(lname, tmp_name, id);
1937
1938                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1939                                               ma);
1940                 if (rc) {
1941                         CERROR("Can not unlink child %s: rc = %d\n", name, rc);
1942                         break;
1943                 }
1944                 id++;
1945         }
1946
1947         RETURN(rc);
1948 }
1949
1950 static struct lu_object *echo_resolve_path(const struct lu_env *env,
1951                                            struct echo_device *ed, char *path,
1952                                            int path_len)
1953 {
1954         struct lu_device        *ld = ed->ed_next;
1955         struct echo_thread_info *info = echo_env_info(env);
1956         struct lu_fid           *fid = &info->eti_fid;
1957         struct lu_name          *lname = &info->eti_lname;
1958         struct lu_object        *parent = NULL;
1959         struct lu_object        *child = NULL;
1960         int                      rc = 0;
1961         ENTRY;
1962
1963         *fid = ed->ed_root_fid;
1964
1965         /* In the function below, .hs_keycmp resolves to
1966          * lu_obj_hop_keycmp() */
1967         /* coverity[overrun-buffer-val] */
1968         parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1969         if (IS_ERR(parent)) {
1970                 CERROR("Can not find the parent "DFID": rc = %ld\n",
1971                         PFID(fid), PTR_ERR(parent));
1972                 RETURN(parent);
1973         }
1974
1975         while (1) {
1976                 struct lu_object *ld_parent;
1977                 char *e;
1978
1979                 e = strsep(&path, "/");
1980                 if (e == NULL)
1981                         break;
1982
1983                 if (e[0] == 0) {
1984                         if (!path || path[0] == '\0')
1985                                 break;
1986                         continue;
1987                 }
1988
1989                 lname->ln_name = e;
1990                 lname->ln_namelen = strlen(e);
1991
1992                 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
1993                 if (ld_parent == NULL) {
1994                         lu_object_put(env, parent);
1995                         rc = -EINVAL;
1996                         break;
1997                 }
1998
1999                 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
2000                 lu_object_put(env, parent);
2001                 if (IS_ERR(child)) {
2002                         rc = (int)PTR_ERR(child);
2003                         CERROR("lookup %s under parent "DFID": rc = %d\n",
2004                                 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2005                                 rc);
2006                         break;
2007                 }
2008                 parent = child;
2009         }
2010         if (rc)
2011                 RETURN(ERR_PTR(rc));
2012
2013         RETURN(parent);
2014 }
2015
2016 static void echo_ucred_init(struct lu_env *env)
2017 {
2018         struct lu_ucred *ucred = lu_ucred(env);
2019
2020         ucred->uc_valid = UCRED_INVALID;
2021
2022         ucred->uc_suppgids[0] = -1;
2023         ucred->uc_suppgids[1] = -1;
2024
2025         ucred->uc_uid = ucred->uc_o_uid  =
2026                                 from_kuid(&init_user_ns, current_uid());
2027         ucred->uc_gid = ucred->uc_o_gid  =
2028                                 from_kgid(&init_user_ns, current_gid());
2029         ucred->uc_fsuid = ucred->uc_o_fsuid =
2030                                 from_kuid(&init_user_ns, current_fsuid());
2031         ucred->uc_fsgid = ucred->uc_o_fsgid =
2032                                 from_kgid(&init_user_ns, current_fsgid());
2033         ucred->uc_cap = cfs_curproc_cap_pack();
2034
2035         /* remove fs privilege for non-root user. */
2036         if (ucred->uc_fsuid)
2037                 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2038         ucred->uc_valid = UCRED_NEW;
2039 }
2040
2041 static void echo_ucred_fini(struct lu_env *env)
2042 {
2043         struct lu_ucred *ucred = lu_ucred(env);
2044         ucred->uc_valid = UCRED_INIT;
2045 }
2046
2047 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
2048 #define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
2049 static int echo_md_handler(struct echo_device *ed, int command,
2050                            char *path, int path_len, __u64 id, int count,
2051                            struct obd_ioctl_data *data)
2052 {
2053         struct echo_thread_info *info;
2054         struct lu_device      *ld = ed->ed_next;
2055         struct lu_env         *env;
2056         __u16                  refcheck;
2057         struct lu_object      *parent;
2058         char                  *name = NULL;
2059         int                    namelen = data->ioc_plen2;
2060         int                    rc = 0;
2061         ENTRY;
2062
2063         if (ld == NULL) {
2064                 CERROR("MD echo client is not being initialized properly\n");
2065                 RETURN(-EINVAL);
2066         }
2067
2068         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2069                 CERROR("Only support MDD layer right now!\n");
2070                 RETURN(-EINVAL);
2071         }
2072
2073         env = cl_env_get(&refcheck);
2074         if (IS_ERR(env))
2075                 RETURN(PTR_ERR(env));
2076
2077         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
2078         if (rc != 0)
2079                 GOTO(out_env, rc);
2080
2081         /* init big_lmm buffer */
2082         info = echo_env_info(env);
2083         LASSERT(info->eti_big_lmm == NULL);
2084         OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2085         if (info->eti_big_lmm == NULL)
2086                 GOTO(out_env, rc = -ENOMEM);
2087         info->eti_big_lmmsize = MIN_MD_SIZE;
2088
2089         parent = echo_resolve_path(env, ed, path, path_len);
2090         if (IS_ERR(parent)) {
2091                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2092                         PTR_ERR(parent));
2093                 GOTO(out_free, rc = PTR_ERR(parent));
2094         }
2095
2096         if (namelen > 0) {
2097                 OBD_ALLOC(name, namelen + 1);
2098                 if (name == NULL)
2099                         GOTO(out_put, rc = -ENOMEM);
2100                 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2101                         GOTO(out_name, rc = -EFAULT);
2102         }
2103
2104         echo_ucred_init(env);
2105
2106         switch (command) {
2107         case ECHO_MD_CREATE:
2108         case ECHO_MD_MKDIR: {
2109                 struct echo_thread_info *info = echo_env_info(env);
2110                 __u32 mode = data->ioc_obdo2.o_mode;
2111                 struct lu_fid *fid = &info->eti_fid;
2112                 int stripe_count = (int)data->ioc_obdo2.o_misc;
2113                 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2114
2115                 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2116                 if (rc != 0)
2117                         break;
2118
2119                 /* In the function below, .hs_keycmp resolves to
2120                  * lu_obj_hop_keycmp() */
2121                 /* coverity[overrun-buffer-val] */
2122                 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2123                                            id, mode, count, stripe_count,
2124                                            stripe_index);
2125                 break;
2126         }
2127         case ECHO_MD_DESTROY:
2128         case ECHO_MD_RMDIR: {
2129                 __u32 mode = data->ioc_obdo2.o_mode;
2130
2131                 rc = echo_destroy_object(env, ed, parent, name, namelen,
2132                                          id, mode, count);
2133                 break;
2134         }
2135         case ECHO_MD_LOOKUP:
2136                 rc = echo_lookup_object(env, ed, parent, id, count);
2137                 break;
2138         case ECHO_MD_GETATTR:
2139                 rc = echo_getattr_object(env, ed, parent, id, count);
2140                 break;
2141         case ECHO_MD_SETATTR:
2142                 rc = echo_setattr_object(env, ed, parent, id, count);
2143                 break;
2144         default:
2145                 CERROR("unknown command %d\n", command);
2146                 rc = -EINVAL;
2147                 break;
2148         }
2149         echo_ucred_fini(env);
2150
2151 out_name:
2152         if (name != NULL)
2153                 OBD_FREE(name, namelen + 1);
2154 out_put:
2155         lu_object_put(env, parent);
2156 out_free:
2157         LASSERT(info->eti_big_lmm);
2158         OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2159         info->eti_big_lmm = NULL;
2160         info->eti_big_lmmsize = 0;
2161 out_env:
2162         cl_env_put(env, &refcheck);
2163         return rc;
2164 }
2165 #endif /* HAVE_SERVER_SUPPORT */
2166
2167 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2168                               struct obdo *oa)
2169 {
2170         struct echo_object      *eco;
2171         struct echo_client_obd  *ec = ed->ed_ec;
2172         int created = 0;
2173         int rc;
2174         ENTRY;
2175
2176         if (!(oa->o_valid & OBD_MD_FLID) ||
2177             !(oa->o_valid & OBD_MD_FLGROUP) ||
2178             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
2179                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2180                 RETURN(-EINVAL);
2181         }
2182
2183         if (ostid_id(&oa->o_oi) == 0)
2184                 ostid_set_id(&oa->o_oi, ++last_object_id);
2185
2186         rc = obd_create(env, ec->ec_exp, oa);
2187         if (rc != 0) {
2188                 CERROR("Cannot create objects: rc = %d\n", rc);
2189                 GOTO(failed, rc);
2190         }
2191
2192         created = 1;
2193
2194         oa->o_valid |= OBD_MD_FLID;
2195
2196         eco = cl_echo_object_find(ed, &oa->o_oi);
2197         if (IS_ERR(eco))
2198                 GOTO(failed, rc = PTR_ERR(eco));
2199         cl_echo_object_put(eco);
2200
2201         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2202         EXIT;
2203
2204 failed:
2205         if (created && rc != 0)
2206                 obd_destroy(env, ec->ec_exp, oa);
2207
2208         if (rc != 0)
2209                 CERROR("create object failed with: rc = %d\n", rc);
2210
2211         return rc;
2212 }
2213
2214 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2215                            struct obdo *oa)
2216 {
2217         struct echo_object *eco;
2218         int rc;
2219         ENTRY;
2220
2221         if (!(oa->o_valid & OBD_MD_FLID) ||
2222             !(oa->o_valid & OBD_MD_FLGROUP) ||
2223             ostid_id(&oa->o_oi) == 0) {
2224                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2225                 RETURN(-EINVAL);
2226         }
2227
2228         rc = 0;
2229         eco = cl_echo_object_find(ed, &oa->o_oi);
2230         if (!IS_ERR(eco))
2231                 *ecop = eco;
2232         else
2233                 rc = PTR_ERR(eco);
2234
2235         RETURN(rc);
2236 }
2237
2238 static void echo_put_object(struct echo_object *eco)
2239 {
2240         int rc;
2241
2242         rc = cl_echo_object_put(eco);
2243         if (rc)
2244                 CERROR("%s: echo client drop an object failed: rc = %d\n",
2245                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2246 }
2247
2248 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2249                                          u64 offset, u64 count)
2250 {
2251         char    *addr;
2252         u64      stripe_off;
2253         u64      stripe_id;
2254         int      delta;
2255
2256         /* no partial pages on the client */
2257         LASSERT(count == PAGE_CACHE_SIZE);
2258
2259         addr = kmap(page);
2260
2261         for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2262                 if (rw == OBD_BRW_WRITE) {
2263                         stripe_off = offset + delta;
2264                         stripe_id = id;
2265                 } else {
2266                         stripe_off = 0xdeadbeef00c0ffeeULL;
2267                         stripe_id = 0xdeadbeef00c0ffeeULL;
2268                 }
2269                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2270                                   stripe_off, stripe_id);
2271         }
2272
2273         kunmap(page);
2274 }
2275
2276 static int
2277 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2278 {
2279         u64      stripe_off;
2280         u64      stripe_id;
2281         char   *addr;
2282         int     delta;
2283         int     rc;
2284         int     rc2;
2285
2286         /* no partial pages on the client */
2287         LASSERT(count == PAGE_CACHE_SIZE);
2288
2289         addr = kmap(page);
2290
2291         for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2292                 stripe_off = offset + delta;
2293                 stripe_id = id;
2294
2295                 rc2 = block_debug_check("test_brw",
2296                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
2297                                         stripe_off, stripe_id);
2298                 if (rc2 != 0) {
2299                         CERROR("Error in echo object %#llx\n", id);
2300                         rc = rc2;
2301                 }
2302         }
2303
2304         kunmap(page);
2305         return rc;
2306 }
2307
2308 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2309                             struct echo_object *eco, u64 offset,
2310                             u64 count, int async)
2311 {
2312         size_t                  npages;
2313         struct brw_page        *pga;
2314         struct brw_page        *pgp;
2315         struct page            **pages;
2316         u64                      off;
2317         size_t                  i;
2318         int                     rc;
2319         int                     verify;
2320         gfp_t                   gfp_mask;
2321         u32                     brw_flags = 0;
2322         ENTRY;
2323
2324         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2325                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2326                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2327
2328         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
2329
2330         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2331
2332         if ((count & (~PAGE_MASK)) != 0)
2333                 RETURN(-EINVAL);
2334
2335         /* XXX think again with misaligned I/O */
2336         npages = count >> PAGE_CACHE_SHIFT;
2337
2338         if (rw == OBD_BRW_WRITE)
2339                 brw_flags = OBD_BRW_ASYNC;
2340
2341         OBD_ALLOC(pga, npages * sizeof(*pga));
2342         if (pga == NULL)
2343                 RETURN(-ENOMEM);
2344
2345         OBD_ALLOC(pages, npages * sizeof(*pages));
2346         if (pages == NULL) {
2347                 OBD_FREE(pga, npages * sizeof(*pga));
2348                 RETURN(-ENOMEM);
2349         }
2350
2351         for (i = 0, pgp = pga, off = offset;
2352              i < npages;
2353              i++, pgp++, off += PAGE_CACHE_SIZE) {
2354
2355                 LASSERT(pgp->pg == NULL);       /* for cleanup */
2356
2357                 rc = -ENOMEM;
2358                 pgp->pg = alloc_page(gfp_mask);
2359                 if (pgp->pg == NULL)
2360                         goto out;
2361
2362                 pages[i] = pgp->pg;
2363                 pgp->count = PAGE_CACHE_SIZE;
2364                 pgp->off = off;
2365                 pgp->flag = brw_flags;
2366
2367                 if (verify)
2368                         echo_client_page_debug_setup(pgp->pg, rw,
2369                                                      ostid_id(&oa->o_oi), off,
2370                                                      pgp->count);
2371         }
2372
2373         /* brw mode can only be used at client */
2374         LASSERT(ed->ed_next != NULL);
2375         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2376
2377  out:
2378         if (rc != 0 || rw != OBD_BRW_READ)
2379                 verify = 0;
2380
2381         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2382                 if (pgp->pg == NULL)
2383                         continue;
2384
2385                 if (verify) {
2386                         int vrc;
2387                         vrc = echo_client_page_debug_check(pgp->pg,
2388                                                            ostid_id(&oa->o_oi),
2389                                                            pgp->off, pgp->count);
2390                         if (vrc != 0 && rc == 0)
2391                                 rc = vrc;
2392                 }
2393                 __free_page(pgp->pg);
2394         }
2395         OBD_FREE(pga, npages * sizeof(*pga));
2396         OBD_FREE(pages, npages * sizeof(*pages));
2397         RETURN(rc);
2398 }
2399
2400 static int echo_client_prep_commit(const struct lu_env *env,
2401                                    struct obd_export *exp, int rw,
2402                                    struct obdo *oa, struct echo_object *eco,
2403                                    u64 offset, u64 count,
2404                                    u64 batch, int async)
2405 {
2406         struct obd_ioobj         ioo;
2407         struct niobuf_local     *lnb;
2408         struct niobuf_remote     rnb;
2409         u64                      off;
2410         u64                      npages, tot_pages, apc;
2411         int i, ret = 0, brw_flags = 0;
2412
2413         ENTRY;
2414
2415         if (count <= 0 || (count & ~PAGE_CACHE_MASK) != 0)
2416                 RETURN(-EINVAL);
2417
2418         apc = npages = batch >> PAGE_CACHE_SHIFT;
2419         tot_pages = count >> PAGE_CACHE_SHIFT;
2420
2421         OBD_ALLOC(lnb, apc * sizeof(struct niobuf_local));
2422         if (lnb == NULL)
2423                 RETURN(-ENOMEM);
2424
2425         if (rw == OBD_BRW_WRITE && async)
2426                 brw_flags |= OBD_BRW_ASYNC;
2427
2428         obdo_to_ioobj(oa, &ioo);
2429
2430         off = offset;
2431
2432         for (; tot_pages > 0; tot_pages -= npages) {
2433                 int lpages;
2434
2435                 if (tot_pages < npages)
2436                         npages = tot_pages;
2437
2438                 rnb.rnb_offset = off;
2439                 rnb.rnb_len = npages * PAGE_CACHE_SIZE;
2440                 rnb.rnb_flags = brw_flags;
2441                 ioo.ioo_bufcnt = 1;
2442                 off += npages * PAGE_CACHE_SIZE;
2443
2444                 lpages = npages;
2445                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
2446                 if (ret != 0)
2447                         GOTO(out, ret);
2448
2449                 for (i = 0; i < lpages; i++) {
2450                         struct page *page = lnb[i].lnb_page;
2451
2452                         /* read past eof? */
2453                         if (page == NULL && lnb[i].lnb_rc == 0)
2454                                 continue;
2455
2456                         if (async)
2457                                 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2458
2459                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2460                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2461                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2462                                 continue;
2463
2464                         if (rw == OBD_BRW_WRITE)
2465                                 echo_client_page_debug_setup(page, rw,
2466                                                         ostid_id(&oa->o_oi),
2467                                                         lnb[i].lnb_file_offset,
2468                                                         lnb[i].lnb_len);
2469                         else
2470                                 echo_client_page_debug_check(page,
2471                                                         ostid_id(&oa->o_oi),
2472                                                         lnb[i].lnb_file_offset,
2473                                                         lnb[i].lnb_len);
2474                 }
2475
2476                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
2477                                    ret);
2478                 if (ret != 0)
2479                         break;
2480
2481                 /* Reuse env context. */
2482                 lu_context_exit((struct lu_context *)&env->le_ctx);
2483                 lu_context_enter((struct lu_context *)&env->le_ctx);
2484         }
2485
2486 out:
2487         OBD_FREE(lnb, apc * sizeof(struct niobuf_local));
2488
2489         RETURN(ret);
2490 }
2491
2492 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2493                                  struct obd_export *exp,
2494                                  struct obd_ioctl_data *data)
2495 {
2496         struct obd_device *obd = class_exp2obd(exp);
2497         struct echo_device *ed = obd2echo_dev(obd);
2498         struct echo_client_obd *ec = ed->ed_ec;
2499         struct obdo *oa = &data->ioc_obdo1;
2500         struct echo_object *eco;
2501         int rc;
2502         int async = 0;
2503         long test_mode;
2504         ENTRY;
2505
2506         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2507
2508         rc = echo_get_object(&eco, ed, oa);
2509         if (rc)
2510                 RETURN(rc);
2511
2512         oa->o_valid &= ~OBD_MD_FLHANDLE;
2513
2514         /* OFD/obdfilter works only via prep/commit */
2515         test_mode = (long)data->ioc_pbuf1;
2516         if (ed->ed_next == NULL && test_mode != 3) {
2517                 test_mode = 3;
2518                 data->ioc_plen1 = data->ioc_count;
2519         }
2520
2521         if (test_mode == 3)
2522                 async = 1;
2523
2524         /* Truncate batch size to maximum */
2525         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2526                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2527
2528         switch (test_mode) {
2529         case 1:
2530                 /* fall through */
2531         case 2:
2532                 rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
2533                                       data->ioc_count, async);
2534                 break;
2535         case 3:
2536                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
2537                                              data->ioc_offset, data->ioc_count,
2538                                              data->ioc_plen1, async);
2539                 break;
2540         default:
2541                 rc = -EINVAL;
2542         }
2543
2544         echo_put_object(eco);
2545
2546         RETURN(rc);
2547 }
2548
2549 static int
2550 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2551                       void *karg, void __user *uarg)
2552 {
2553 #ifdef HAVE_SERVER_SUPPORT
2554         struct tgt_session_info *tsi;
2555 #endif
2556         struct obd_device      *obd = exp->exp_obd;
2557         struct echo_device     *ed = obd2echo_dev(obd);
2558         struct echo_client_obd *ec = ed->ed_ec;
2559         struct echo_object     *eco;
2560         struct obd_ioctl_data  *data = karg;
2561         struct lu_env          *env;
2562         struct obdo            *oa;
2563         struct lu_fid           fid;
2564         int                     rw = OBD_BRW_READ;
2565         int                     rc = 0;
2566 #ifdef HAVE_SERVER_SUPPORT
2567         struct lu_context        echo_session;
2568 #endif
2569         ENTRY;
2570
2571         oa = &data->ioc_obdo1;
2572         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2573                 oa->o_valid |= OBD_MD_FLGROUP;
2574                 ostid_set_seq_echo(&oa->o_oi);
2575         }
2576
2577         /* This FID is unpacked just for validation at this point */
2578         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2579         if (rc < 0)
2580                 RETURN(rc);
2581
2582         OBD_ALLOC_PTR(env);
2583         if (env == NULL)
2584                 RETURN(-ENOMEM);
2585
2586         rc = lu_env_init(env, LCT_DT_THREAD);
2587         if (rc)
2588                 GOTO(out_alloc, rc = -ENOMEM);
2589
2590 #ifdef HAVE_SERVER_SUPPORT
2591         env->le_ses = &echo_session;
2592         rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
2593         if (unlikely(rc < 0))
2594                 GOTO(out_env, rc);
2595         lu_context_enter(env->le_ses);
2596
2597         tsi = tgt_ses_info(env);
2598         tsi->tsi_exp = ec->ec_exp;
2599         tsi->tsi_jobid = NULL;
2600 #endif
2601         switch (cmd) {
2602         case OBD_IOC_CREATE:                    /* may create echo object */
2603                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2604                         GOTO (out, rc = -EPERM);
2605
2606                 rc = echo_create_object(env, ed, oa);
2607                 GOTO(out, rc);
2608
2609 #ifdef HAVE_SERVER_SUPPORT
2610         case OBD_IOC_ECHO_MD: {
2611                 int count;
2612                 int cmd;
2613                 char *dir = NULL;
2614                 int dirlen;
2615                 __u64 id;
2616
2617                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2618                         GOTO(out, rc = -EPERM);
2619
2620                 count = data->ioc_count;
2621                 cmd = data->ioc_command;
2622
2623                 id = data->ioc_obdo2.o_oi.oi.oi_id;
2624                 dirlen = data->ioc_plen1;
2625                 OBD_ALLOC(dir, dirlen + 1);
2626                 if (dir == NULL)
2627                         GOTO(out, rc = -ENOMEM);
2628
2629                 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2630                         OBD_FREE(dir, data->ioc_plen1 + 1);
2631                         GOTO(out, rc = -EFAULT);
2632                 }
2633
2634                 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2635                 OBD_FREE(dir, dirlen + 1);
2636                 GOTO(out, rc);
2637         }
2638         case OBD_IOC_ECHO_ALLOC_SEQ: {
2639                 struct lu_env   *cl_env;
2640                 __u16            refcheck;
2641                 __u64            seq;
2642                 int              max_count;
2643
2644                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2645                         GOTO(out, rc = -EPERM);
2646
2647                 cl_env = cl_env_get(&refcheck);
2648                 if (IS_ERR(cl_env))
2649                         GOTO(out, rc = PTR_ERR(cl_env));
2650
2651                 rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
2652                                             ECHO_MD_SES_TAG);
2653                 if (rc != 0) {
2654                         cl_env_put(cl_env, &refcheck);
2655                         GOTO(out, rc);
2656                 }
2657
2658                 rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
2659                 cl_env_put(cl_env, &refcheck);
2660                 if (rc < 0) {
2661                         CERROR("%s: Can not alloc seq: rc = %d\n",
2662                                obd->obd_name, rc);
2663                         GOTO(out, rc);
2664                 }
2665
2666                 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2667                         return -EFAULT;
2668
2669                 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2670                 if (copy_to_user(data->ioc_pbuf2, &max_count,
2671                                      data->ioc_plen2))
2672                         return -EFAULT;
2673                 GOTO(out, rc);
2674         }
2675 #endif /* HAVE_SERVER_SUPPORT */
2676         case OBD_IOC_DESTROY:
2677                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2678                         GOTO (out, rc = -EPERM);
2679
2680                 rc = echo_get_object(&eco, ed, oa);
2681                 if (rc == 0) {
2682                         rc = obd_destroy(env, ec->ec_exp, oa);
2683                         if (rc == 0)
2684                                 eco->eo_deleted = 1;
2685                         echo_put_object(eco);
2686                 }
2687                 GOTO(out, rc);
2688
2689         case OBD_IOC_GETATTR:
2690                 rc = echo_get_object(&eco, ed, oa);
2691                 if (rc == 0) {
2692                         rc = obd_getattr(env, ec->ec_exp, oa);
2693                         echo_put_object(eco);
2694                 }
2695                 GOTO(out, rc);
2696
2697         case OBD_IOC_SETATTR:
2698                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2699                         GOTO (out, rc = -EPERM);
2700
2701                 rc = echo_get_object(&eco, ed, oa);
2702                 if (rc == 0) {
2703                         rc = obd_setattr(env, ec->ec_exp, oa);
2704                         echo_put_object(eco);
2705                 }
2706                 GOTO(out, rc);
2707
2708         case OBD_IOC_BRW_WRITE:
2709                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2710                         GOTO (out, rc = -EPERM);
2711
2712                 rw = OBD_BRW_WRITE;
2713                 /* fall through */
2714         case OBD_IOC_BRW_READ:
2715                 rc = echo_client_brw_ioctl(env, rw, exp, data);
2716                 GOTO(out, rc);
2717
2718         default:
2719                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2720                 GOTO (out, rc = -ENOTTY);
2721         }
2722
2723         EXIT;
2724 out:
2725 #ifdef HAVE_SERVER_SUPPORT
2726         lu_context_exit(env->le_ses);
2727         lu_context_fini(env->le_ses);
2728 out_env:
2729 #endif
2730         lu_env_fini(env);
2731 out_alloc:
2732         OBD_FREE_PTR(env);
2733
2734         return rc;
2735 }
2736
2737 static int echo_client_setup(const struct lu_env *env,
2738                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2739 {
2740         struct echo_client_obd *ec = &obddev->u.echo_client;
2741         struct obd_device *tgt;
2742         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2743         struct obd_connect_data *ocd = NULL;
2744         int rc;
2745         ENTRY;
2746
2747         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2748                 CERROR("requires a TARGET OBD name\n");
2749                 RETURN(-EINVAL);
2750         }
2751
2752         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2753         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2754                 CERROR("device not attached or not set up (%s)\n",
2755                        lustre_cfg_string(lcfg, 1));
2756                 RETURN(-EINVAL);
2757         }
2758
2759         spin_lock_init(&ec->ec_lock);
2760         INIT_LIST_HEAD(&ec->ec_objects);
2761         INIT_LIST_HEAD(&ec->ec_locks);
2762         ec->ec_unique = 0;
2763
2764         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2765 #ifdef HAVE_SERVER_SUPPORT
2766                 lu_context_tags_update(ECHO_MD_CTX_TAG);
2767                 lu_session_tags_update(ECHO_MD_SES_TAG);
2768 #else
2769                 CERROR("Local operations are NOT supported on client side. "
2770                        "Only remote operations are supported. Metadata client "
2771                        "must be run on server side.\n");
2772 #endif
2773                 RETURN(0);
2774         }
2775
2776         OBD_ALLOC(ocd, sizeof(*ocd));
2777         if (ocd == NULL) {
2778                 CERROR("Can't alloc ocd connecting to %s\n",
2779                        lustre_cfg_string(lcfg, 1));
2780                 return -ENOMEM;
2781         }
2782
2783         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2784                                  OBD_CONNECT_BRW_SIZE |
2785                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2786                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2787                                  OBD_CONNECT_FID;
2788         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2789         ocd->ocd_version = LUSTRE_VERSION_CODE;
2790         ocd->ocd_group = FID_SEQ_ECHO;
2791
2792         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2793         if (rc == 0) {
2794                 /* Turn off pinger because it connects to tgt obd directly. */
2795                 spin_lock(&tgt->obd_dev_lock);
2796                 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2797                 spin_unlock(&tgt->obd_dev_lock);
2798         }
2799
2800         OBD_FREE(ocd, sizeof(*ocd));
2801
2802         if (rc != 0) {
2803                 CERROR("fail to connect to device %s\n",
2804                        lustre_cfg_string(lcfg, 1));
2805                 return (rc);
2806         }
2807
2808         RETURN(rc);
2809 }
2810
2811 static int echo_client_cleanup(struct obd_device *obddev)
2812 {
2813         struct echo_device *ed = obd2echo_dev(obddev);
2814         struct echo_client_obd *ec = &obddev->u.echo_client;
2815         int rc;
2816         ENTRY;
2817
2818         /*Do nothing for Metadata echo client*/
2819         if (ed == NULL )
2820                 RETURN(0);
2821
2822         if (ed->ed_next_ismd) {
2823 #ifdef HAVE_SERVER_SUPPORT
2824                 lu_context_tags_clear(ECHO_MD_CTX_TAG);
2825                 lu_session_tags_clear(ECHO_MD_SES_TAG);
2826 #else
2827                 CERROR("This is client-side only module, does not support "
2828                         "metadata echo client.\n");
2829 #endif
2830                 RETURN(0);
2831         }
2832
2833         if (!list_empty(&obddev->obd_exports)) {
2834                 CERROR("still has clients!\n");
2835                 RETURN(-EBUSY);
2836         }
2837
2838         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2839         rc = obd_disconnect(ec->ec_exp);
2840         if (rc != 0)
2841                 CERROR("fail to disconnect device: %d\n", rc);
2842
2843         RETURN(rc);
2844 }
2845
2846 static int echo_client_connect(const struct lu_env *env,
2847                                struct obd_export **exp,
2848                                struct obd_device *src, struct obd_uuid *cluuid,
2849                                struct obd_connect_data *data, void *localdata)
2850 {
2851         int                rc;
2852         struct lustre_handle conn = { 0 };
2853
2854         ENTRY;
2855         rc = class_connect(&conn, src, cluuid);
2856         if (rc == 0) {
2857                 *exp = class_conn2export(&conn);
2858         }
2859
2860         RETURN (rc);
2861 }
2862
2863 static int echo_client_disconnect(struct obd_export *exp)
2864 {
2865         int                     rc;
2866         ENTRY;
2867
2868         if (exp == NULL)
2869                 GOTO(out, rc = -EINVAL);
2870
2871         rc = class_disconnect(exp);
2872         GOTO(out, rc);
2873  out:
2874         return rc;
2875 }
2876
2877 static struct obd_ops echo_client_obd_ops = {
2878         .o_owner       = THIS_MODULE,
2879         .o_iocontrol   = echo_client_iocontrol,
2880         .o_connect     = echo_client_connect,
2881         .o_disconnect  = echo_client_disconnect
2882 };
2883
2884 static int __init obdecho_init(void)
2885 {
2886         int rc;
2887
2888         ENTRY;
2889         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2890
2891         LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2892
2893 # ifdef HAVE_SERVER_SUPPORT
2894         rc = echo_persistent_pages_init();
2895         if (rc != 0)
2896                 goto failed_0;
2897
2898         rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
2899                                  LUSTRE_ECHO_NAME, NULL);
2900         if (rc != 0)
2901                 goto failed_1;
2902 # endif
2903
2904         rc = lu_kmem_init(echo_caches);
2905         if (rc == 0) {
2906                 rc = class_register_type(&echo_client_obd_ops, NULL, true, NULL,
2907                                          LUSTRE_ECHO_CLIENT_NAME,
2908                                          &echo_device_type);
2909                 if (rc)
2910                         lu_kmem_fini(echo_caches);
2911         }
2912
2913 # ifdef HAVE_SERVER_SUPPORT
2914         if (rc == 0)
2915                 RETURN(0);
2916
2917         class_unregister_type(LUSTRE_ECHO_NAME);
2918 failed_1:
2919         echo_persistent_pages_fini();
2920 failed_0:
2921 # endif
2922         RETURN(rc);
2923 }
2924
2925 static void __exit obdecho_exit(void)
2926 {
2927         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2928         lu_kmem_fini(echo_caches);
2929
2930 #ifdef HAVE_SERVER_SUPPORT
2931         class_unregister_type(LUSTRE_ECHO_NAME);
2932         echo_persistent_pages_fini();
2933 #endif
2934 }
2935
2936 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2937 MODULE_DESCRIPTION("Lustre Echo Client test driver");
2938 MODULE_VERSION(LUSTRE_VERSION_STRING);
2939 MODULE_LICENSE("GPL");
2940
2941 module_init(obdecho_init);
2942 module_exit(obdecho_exit);
2943
2944 /** @} echo_client */