Whamcloud - gitweb
LU-7988 hsm: run HSM coordinator once per second at most
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_ECHO
34
35 #include <linux/user_namespace.h>
36 #ifdef HAVE_UIDGID_HEADER
37 # include <linux/uidgid.h>
38 #endif
39 #include <libcfs/libcfs.h>
40
41 #include <obd.h>
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <lustre_debug.h>
45 #include <lprocfs_status.h>
46 #include <cl_object.h>
47 #include <lustre_fid.h>
48 #include <lustre_acl.h>
49 #include <uapi/linux/lustre_ioctl.h>
50 #include <lustre_net.h>
51 #ifdef HAVE_SERVER_SUPPORT
52 # include <md_object.h>
53
54 #define ETI_NAME_LEN    20
55
56 #endif /* HAVE_SERVER_SUPPORT */
57
58 #include "echo_internal.h"
59
60 /** \defgroup echo_client Echo Client
61  * @{
62  */
63
64 struct echo_device {
65         struct cl_device          ed_cl;
66         struct echo_client_obd   *ed_ec;
67
68         struct cl_site            ed_site_myself;
69         struct lu_site           *ed_site;
70         struct lu_device         *ed_next;
71         int                       ed_next_ismd;
72         struct lu_client_seq     *ed_cl_seq;
73 #ifdef HAVE_SERVER_SUPPORT
74         struct local_oid_storage *ed_los;
75         struct lu_fid             ed_root_fid;
76 #endif /* HAVE_SERVER_SUPPORT */
77 };
78
79 struct echo_object {
80         struct cl_object        eo_cl;
81         struct cl_object_header eo_hdr;
82         struct echo_device     *eo_dev;
83         struct list_head        eo_obj_chain;
84         struct lov_oinfo       *eo_oinfo;
85         atomic_t                eo_npages;
86         int                     eo_deleted;
87 };
88
89 struct echo_object_conf {
90         struct cl_object_conf   eoc_cl;
91         struct lov_oinfo      **eoc_oinfo;
92 };
93
94 struct echo_page {
95         struct cl_page_slice    ep_cl;
96         struct mutex            ep_lock;
97 };
98
99 struct echo_lock {
100         struct cl_lock_slice    el_cl;
101         struct list_head        el_chain;
102         struct echo_object     *el_object;
103         __u64                   el_cookie;
104         atomic_t                el_refcount;
105 };
106
107 #ifdef HAVE_SERVER_SUPPORT
108 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
109
110 /**
111  * In order to use the values of members in struct mdd_device,
112  * we define an alias structure here.
113  */
114 struct echo_md_device {
115         struct md_device                 emd_md_dev;
116         struct obd_export               *emd_child_exp;
117         struct dt_device                *emd_child;
118         struct dt_device                *emd_bottom;
119         struct lu_fid                    emd_root_fid;
120         struct lu_fid                    emd_local_root_fid;
121 };
122 #endif /* HAVE_SERVER_SUPPORT */
123
124 static int echo_client_setup(const struct lu_env *env,
125                              struct obd_device *obddev,
126                              struct lustre_cfg *lcfg);
127 static int echo_client_cleanup(struct obd_device *obddev);
128
129
130 /** \defgroup echo_helpers Helper functions
131  * @{
132  */
133 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
134 {
135         return container_of0(dev, struct echo_device, ed_cl);
136 }
137
138 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
139 {
140         return &d->ed_cl;
141 }
142
143 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
144 {
145         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
146 }
147
148 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
149 {
150         return &eco->eo_cl;
151 }
152
153 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
154 {
155         return container_of(o, struct echo_object, eo_cl);
156 }
157
158 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
159 {
160         return container_of(s, struct echo_page, ep_cl);
161 }
162
163 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
164 {
165         return container_of(s, struct echo_lock, el_cl);
166 }
167
168 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
169 {
170         return ecl->el_cl.cls_lock;
171 }
172
173 static struct lu_context_key echo_thread_key;
174 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
175 {
176         struct echo_thread_info *info;
177         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
178         LASSERT(info != NULL);
179         return info;
180 }
181
182 static inline
183 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
184 {
185         return container_of(c, struct echo_object_conf, eoc_cl);
186 }
187
188 #ifdef HAVE_SERVER_SUPPORT
189 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
190 {
191         return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
192 }
193
194 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
195 {
196         return &d->emd_md_dev.md_lu_dev;
197 }
198
199 static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
200 {
201         return emd2lu_dev(d)->ld_site->ld_seq_site;
202 }
203
204 static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
205 {
206         return d->emd_md_dev.md_lu_dev.ld_obd;
207 }
208 #endif /* HAVE_SERVER_SUPPORT */
209
210 /** @} echo_helpers */
211
212 static int cl_echo_object_put(struct echo_object *eco);
213 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
214                               struct page **pages, int npages, int async);
215
216 struct echo_thread_info {
217         struct echo_object_conf eti_conf;
218         struct lustre_md        eti_md;
219
220         struct cl_2queue        eti_queue;
221         struct cl_io            eti_io;
222         struct cl_lock          eti_lock;
223         struct lu_fid           eti_fid;
224         struct lu_fid           eti_fid2;
225 #ifdef HAVE_SERVER_SUPPORT
226         struct md_op_spec       eti_spec;
227         struct lov_mds_md_v3    eti_lmm;
228         struct lov_user_md_v3   eti_lum;
229         struct md_attr          eti_ma;
230         struct lu_name          eti_lname;
231         /* per-thread values, can be re-used */
232         void                    *eti_big_lmm; /* may be vmalloc'd */
233         int                     eti_big_lmmsize;
234         char                    eti_name[ETI_NAME_LEN];
235         struct lu_buf           eti_buf;
236         /* If we want to test large ACL, then need to enlarge the buffer. */
237         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE_OLD];
238 #endif
239 };
240
241 /* No session used right now */
242 struct echo_session_info {
243         unsigned long dummy;
244 };
245
246 static struct kmem_cache *echo_lock_kmem;
247 static struct kmem_cache *echo_object_kmem;
248 static struct kmem_cache *echo_thread_kmem;
249 static struct kmem_cache *echo_session_kmem;
250 /* static struct kmem_cache *echo_req_kmem; */
251
252 static struct lu_kmem_descr echo_caches[] = {
253         {
254                 .ckd_cache = &echo_lock_kmem,
255                 .ckd_name  = "echo_lock_kmem",
256                 .ckd_size  = sizeof (struct echo_lock)
257         },
258         {
259                 .ckd_cache = &echo_object_kmem,
260                 .ckd_name  = "echo_object_kmem",
261                 .ckd_size  = sizeof (struct echo_object)
262         },
263         {
264                 .ckd_cache = &echo_thread_kmem,
265                 .ckd_name  = "echo_thread_kmem",
266                 .ckd_size  = sizeof (struct echo_thread_info)
267         },
268         {
269                 .ckd_cache = &echo_session_kmem,
270                 .ckd_name  = "echo_session_kmem",
271                 .ckd_size  = sizeof (struct echo_session_info)
272         },
273         {
274                 .ckd_cache = NULL
275         }
276 };
277
278 /** \defgroup echo_page Page operations
279  *
280  * Echo page operations.
281  *
282  * @{
283  */
284 static int echo_page_own(const struct lu_env *env,
285                          const struct cl_page_slice *slice,
286                          struct cl_io *io, int nonblock)
287 {
288         struct echo_page *ep = cl2echo_page(slice);
289
290         if (!nonblock)
291                 mutex_lock(&ep->ep_lock);
292         else if (!mutex_trylock(&ep->ep_lock))
293                 return -EAGAIN;
294         return 0;
295 }
296
297 static void echo_page_disown(const struct lu_env *env,
298                              const struct cl_page_slice *slice,
299                              struct cl_io *io)
300 {
301         struct echo_page *ep = cl2echo_page(slice);
302
303         LASSERT(mutex_is_locked(&ep->ep_lock));
304         mutex_unlock(&ep->ep_lock);
305 }
306
307 static void echo_page_discard(const struct lu_env *env,
308                               const struct cl_page_slice *slice,
309                               struct cl_io *unused)
310 {
311         cl_page_delete(env, slice->cpl_page);
312 }
313
314 static int echo_page_is_vmlocked(const struct lu_env *env,
315                                  const struct cl_page_slice *slice)
316 {
317         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
318                 return -EBUSY;
319         return -ENODATA;
320 }
321
322 static void echo_page_completion(const struct lu_env *env,
323                                  const struct cl_page_slice *slice,
324                                  int ioret)
325 {
326         LASSERT(slice->cpl_page->cp_sync_io != NULL);
327 }
328
329 static void echo_page_fini(const struct lu_env *env,
330                            struct cl_page_slice *slice)
331 {
332         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
333         ENTRY;
334
335         atomic_dec(&eco->eo_npages);
336         put_page(slice->cpl_page->cp_vmpage);
337         EXIT;
338 }
339
340 static int echo_page_prep(const struct lu_env *env,
341                           const struct cl_page_slice *slice,
342                           struct cl_io *unused)
343 {
344         return 0;
345 }
346
347 static int echo_page_print(const struct lu_env *env,
348                            const struct cl_page_slice *slice,
349                            void *cookie, lu_printer_t printer)
350 {
351         struct echo_page *ep = cl2echo_page(slice);
352
353         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
354                    ep, mutex_is_locked(&ep->ep_lock),
355                    slice->cpl_page->cp_vmpage);
356         return 0;
357 }
358
359 static const struct cl_page_operations echo_page_ops = {
360         .cpo_own           = echo_page_own,
361         .cpo_disown        = echo_page_disown,
362         .cpo_discard       = echo_page_discard,
363         .cpo_fini          = echo_page_fini,
364         .cpo_print         = echo_page_print,
365         .cpo_is_vmlocked   = echo_page_is_vmlocked,
366         .io = {
367                 [CRT_READ] = {
368                         .cpo_prep        = echo_page_prep,
369                         .cpo_completion  = echo_page_completion,
370                 },
371                 [CRT_WRITE] = {
372                         .cpo_prep        = echo_page_prep,
373                         .cpo_completion  = echo_page_completion,
374                 }
375         }
376 };
377 /** @} echo_page */
378
379 /** \defgroup echo_lock Locking
380  *
381  * echo lock operations
382  *
383  * @{
384  */
385 static void echo_lock_fini(const struct lu_env *env,
386                            struct cl_lock_slice *slice)
387 {
388         struct echo_lock *ecl = cl2echo_lock(slice);
389
390         LASSERT(list_empty(&ecl->el_chain));
391         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
392 }
393
394 static struct cl_lock_operations echo_lock_ops = {
395         .clo_fini      = echo_lock_fini,
396 };
397
398 /** @} echo_lock */
399
400 /** \defgroup echo_cl_ops cl_object operations
401  *
402  * operations for cl_object
403  *
404  * @{
405  */
406 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
407                           struct cl_page *page, pgoff_t index)
408 {
409         struct echo_page *ep = cl_object_page_slice(obj, page);
410         struct echo_object *eco = cl2echo_obj(obj);
411         ENTRY;
412
413         get_page(page->cp_vmpage);
414         mutex_init(&ep->ep_lock);
415         cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
416         atomic_inc(&eco->eo_npages);
417         RETURN(0);
418 }
419
420 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
421                         struct cl_io *io)
422 {
423         return 0;
424 }
425
426 static int echo_lock_init(const struct lu_env *env,
427                           struct cl_object *obj, struct cl_lock *lock,
428                           const struct cl_io *unused)
429 {
430         struct echo_lock *el;
431         ENTRY;
432
433         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
434         if (el != NULL) {
435                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
436                 el->el_object = cl2echo_obj(obj);
437                 INIT_LIST_HEAD(&el->el_chain);
438                 atomic_set(&el->el_refcount, 0);
439         }
440         RETURN(el == NULL ? -ENOMEM : 0);
441 }
442
443 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
444                          const struct cl_object_conf *conf)
445 {
446         return 0;
447 }
448
449 static const struct cl_object_operations echo_cl_obj_ops = {
450         .coo_page_init = echo_page_init,
451         .coo_lock_init = echo_lock_init,
452         .coo_io_init   = echo_io_init,
453         .coo_conf_set  = echo_conf_set
454 };
455 /** @} echo_cl_ops */
456
457 /** \defgroup echo_lu_ops lu_object operations
458  *
459  * operations for echo lu object.
460  *
461  * @{
462  */
463 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
464                             const struct lu_object_conf *conf)
465 {
466         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
467         struct echo_client_obd *ec     = ed->ed_ec;
468         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
469         ENTRY;
470
471         if (ed->ed_next) {
472                 struct lu_object  *below;
473                 struct lu_device  *under;
474
475                 under = ed->ed_next;
476                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
477                                                         under);
478                 if (below == NULL)
479                         RETURN(-ENOMEM);
480                 lu_object_add(obj, below);
481         }
482
483         if (!ed->ed_next_ismd) {
484                 const struct cl_object_conf *cconf = lu2cl_conf(conf);
485                 struct echo_object_conf *econf = cl2echo_conf(cconf);
486
487                 LASSERT(econf->eoc_oinfo != NULL);
488
489                 /* Transfer the oinfo pointer to eco that it won't be
490                  * freed. */
491                 eco->eo_oinfo = *econf->eoc_oinfo;
492                 *econf->eoc_oinfo = NULL;
493         } else {
494                 eco->eo_oinfo = NULL;
495         }
496
497         eco->eo_dev = ed;
498         atomic_set(&eco->eo_npages, 0);
499         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
500
501         spin_lock(&ec->ec_lock);
502         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
503         spin_unlock(&ec->ec_lock);
504
505         RETURN(0);
506 }
507
508 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
509 {
510         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
511         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
512         ENTRY;
513
514         LASSERT(atomic_read(&eco->eo_npages) == 0);
515
516         spin_lock(&ec->ec_lock);
517         list_del_init(&eco->eo_obj_chain);
518         spin_unlock(&ec->ec_lock);
519
520         lu_object_fini(obj);
521         lu_object_header_fini(obj->lo_header);
522
523         if (eco->eo_oinfo != NULL)
524                 OBD_FREE_PTR(eco->eo_oinfo);
525
526         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
527         EXIT;
528 }
529
530 static int echo_object_print(const struct lu_env *env, void *cookie,
531                             lu_printer_t p, const struct lu_object *o)
532 {
533         struct echo_object *obj = cl2echo_obj(lu2cl(o));
534
535         return (*p)(env, cookie, "echoclient-object@%p", obj);
536 }
537
538 static const struct lu_object_operations echo_lu_obj_ops = {
539         .loo_object_init      = echo_object_init,
540         .loo_object_delete    = NULL,
541         .loo_object_release   = NULL,
542         .loo_object_free      = echo_object_free,
543         .loo_object_print     = echo_object_print,
544         .loo_object_invariant = NULL
545 };
546 /** @} echo_lu_ops */
547
548 /** \defgroup echo_lu_dev_ops  lu_device operations
549  *
550  * Operations for echo lu device.
551  *
552  * @{
553  */
554 static struct lu_object *echo_object_alloc(const struct lu_env *env,
555                                            const struct lu_object_header *hdr,
556                                            struct lu_device *dev)
557 {
558         struct echo_object *eco;
559         struct lu_object *obj = NULL;
560         ENTRY;
561
562         /* we're the top dev. */
563         LASSERT(hdr == NULL);
564         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
565         if (eco != NULL) {
566                 struct cl_object_header *hdr = &eco->eo_hdr;
567
568                 obj = &echo_obj2cl(eco)->co_lu;
569                 cl_object_header_init(hdr);
570                 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
571
572                 lu_object_init(obj, &hdr->coh_lu, dev);
573                 lu_object_add_top(&hdr->coh_lu, obj);
574
575                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
576                 obj->lo_ops       = &echo_lu_obj_ops;
577         }
578         RETURN(obj);
579 }
580
581 static struct lu_device_operations echo_device_lu_ops = {
582         .ldo_object_alloc   = echo_object_alloc,
583 };
584
585 /** @} echo_lu_dev_ops */
586
587 /** \defgroup echo_init Setup and teardown
588  *
589  * Init and fini functions for echo client.
590  *
591  * @{
592  */
593 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
594 {
595         struct cl_site *site = &ed->ed_site_myself;
596         int rc;
597
598         /* initialize site */
599         rc = cl_site_init(site, &ed->ed_cl);
600         if (rc) {
601                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
602                 return rc;
603         }
604
605         rc = lu_site_init_finish(&site->cs_lu);
606         if (rc) {
607                 cl_site_fini(site);
608                 return rc;
609         }
610
611         ed->ed_site = &site->cs_lu;
612         return 0;
613 }
614
615 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
616 {
617         if (ed->ed_site) {
618                 if (!ed->ed_next_ismd)
619                         lu_site_fini(ed->ed_site);
620                 ed->ed_site = NULL;
621         }
622 }
623
624 static void *echo_thread_key_init(const struct lu_context *ctx,
625                                   struct lu_context_key *key)
626 {
627         struct echo_thread_info *info;
628
629         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
630         if (info == NULL)
631                 info = ERR_PTR(-ENOMEM);
632         return info;
633 }
634
635 static void echo_thread_key_fini(const struct lu_context *ctx,
636                          struct lu_context_key *key, void *data)
637 {
638         struct echo_thread_info *info = data;
639         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
640 }
641
642 static struct lu_context_key echo_thread_key = {
643         .lct_tags = LCT_CL_THREAD,
644         .lct_init = echo_thread_key_init,
645         .lct_fini = echo_thread_key_fini,
646 };
647
648 static void *echo_session_key_init(const struct lu_context *ctx,
649                                   struct lu_context_key *key)
650 {
651         struct echo_session_info *session;
652
653         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
654         if (session == NULL)
655                 session = ERR_PTR(-ENOMEM);
656         return session;
657 }
658
659 static void echo_session_key_fini(const struct lu_context *ctx,
660                                  struct lu_context_key *key, void *data)
661 {
662         struct echo_session_info *session = data;
663         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
664 }
665
666 static struct lu_context_key echo_session_key = {
667         .lct_tags = LCT_SESSION,
668         .lct_init = echo_session_key_init,
669         .lct_fini = echo_session_key_fini,
670 };
671
672 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
673
674 #ifdef HAVE_SERVER_SUPPORT
675 # define ECHO_SEQ_WIDTH 0xffffffff
676 static int echo_fid_init(struct echo_device *ed, char *obd_name,
677                          struct seq_server_site *ss)
678 {
679         char *prefix;
680         int rc;
681         ENTRY;
682
683         OBD_ALLOC_PTR(ed->ed_cl_seq);
684         if (ed->ed_cl_seq == NULL)
685                 RETURN(-ENOMEM);
686
687         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
688         if (prefix == NULL)
689                 GOTO(out_free_seq, rc = -ENOMEM);
690
691         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
692
693         /* Init client side sequence-manager */
694         rc = seq_client_init(ed->ed_cl_seq, NULL,
695                              LUSTRE_SEQ_METADATA,
696                              prefix, ss->ss_server_seq);
697         ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
698         OBD_FREE(prefix, MAX_OBD_NAME + 5);
699         if (rc)
700                 GOTO(out_free_seq, rc);
701
702         RETURN(0);
703
704 out_free_seq:
705         OBD_FREE_PTR(ed->ed_cl_seq);
706         ed->ed_cl_seq = NULL;
707         RETURN(rc);
708 }
709
710 static int echo_fid_fini(struct obd_device *obddev)
711 {
712         struct echo_device *ed = obd2echo_dev(obddev);
713         ENTRY;
714
715         if (ed->ed_cl_seq != NULL) {
716                 seq_client_fini(ed->ed_cl_seq);
717                 OBD_FREE_PTR(ed->ed_cl_seq);
718                 ed->ed_cl_seq = NULL;
719         }
720
721         RETURN(0);
722 }
723
724 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
725 {
726         ENTRY;
727
728         if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
729                 local_oid_storage_fini(env, ed->ed_los);
730                 ed->ed_los = NULL;
731         }
732 }
733
734 static int
735 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
736                           struct local_oid_storage *los,
737                           const struct lu_fid *pfid, const char *name,
738                           __u32 mode, struct lu_fid *fid)
739 {
740         struct dt_object        *parent = NULL;
741         struct dt_object        *dto = NULL;
742         int                      rc = 0;
743         ENTRY;
744
745         LASSERT(!fid_is_zero(pfid));
746         parent = dt_locate(env, emd->emd_bottom, pfid);
747         if (unlikely(IS_ERR(parent)))
748                 RETURN(PTR_ERR(parent));
749
750         /* create local file with @fid */
751         dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
752                                                  parent, name, mode);
753         if (IS_ERR(dto))
754                 GOTO(out_put, rc = PTR_ERR(dto));
755
756         *fid = *lu_object_fid(&dto->do_lu);
757         /* since stack is not fully set up the local_storage uses own stack
758          * and we should drop its object from cache */
759         dt_object_put_nocache(env, dto);
760
761         EXIT;
762 out_put:
763         dt_object_put(env, parent);
764         RETURN(rc);
765 }
766
767 static int
768 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
769                  struct echo_device *ed)
770 {
771         struct lu_fid                    fid;
772         int                              rc = 0;
773         ENTRY;
774
775         /* Setup local dirs */
776         fid.f_seq = FID_SEQ_LOCAL_NAME;
777         fid.f_oid = 1;
778         fid.f_ver = 0;
779         rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
780         if (rc != 0)
781                 RETURN(rc);
782
783         lu_echo_root_fid(&fid);
784         if (echo_md_seq_site(emd)->ss_node_id == 0) {
785                 rc = echo_md_local_file_create(env, emd, ed->ed_los,
786                                                &emd->emd_local_root_fid,
787                                                echo_md_root_dir_name, S_IFDIR |
788                                                S_IRUGO | S_IWUSR | S_IXUGO,
789                                                &fid);
790                 if (rc != 0) {
791                         CERROR("%s: create md echo root fid failed: rc = %d\n",
792                                emd2obd_dev(emd)->obd_name, rc);
793                         GOTO(out_los, rc);
794                 }
795         }
796         ed->ed_root_fid = fid;
797
798         RETURN(0);
799 out_los:
800         echo_ed_los_fini(env, ed);
801
802         RETURN(rc);
803 }
804 #endif /* HAVE_SERVER_SUPPORT */
805
806 static struct lu_device *echo_device_alloc(const struct lu_env *env,
807                                            struct lu_device_type *t,
808                                            struct lustre_cfg *cfg)
809 {
810         struct lu_device   *next;
811         struct echo_device *ed;
812         struct cl_device   *cd;
813         struct obd_device  *obd = NULL; /* to keep compiler happy */
814         struct obd_device  *tgt;
815         const char *tgt_type_name;
816         int rc;
817         int cleanup = 0;
818         ENTRY;
819
820         OBD_ALLOC_PTR(ed);
821         if (ed == NULL)
822                 GOTO(out, rc = -ENOMEM);
823
824         cleanup = 1;
825         cd = &ed->ed_cl;
826         rc = cl_device_init(cd, t);
827         if (rc)
828                 GOTO(out, rc);
829
830         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
831
832         cleanup = 2;
833         obd = class_name2obd(lustre_cfg_string(cfg, 0));
834         LASSERT(obd != NULL);
835         LASSERT(env != NULL);
836
837         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
838         if (tgt == NULL) {
839                 CERROR("Can not find tgt device %s\n",
840                         lustre_cfg_string(cfg, 1));
841                 GOTO(out, rc = -ENODEV);
842         }
843
844         next = tgt->obd_lu_dev;
845
846         if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
847                 ed->ed_next_ismd = 1;
848         } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
849                    strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
850                 ed->ed_next_ismd = 0;
851                 rc = echo_site_init(env, ed);
852                 if (rc)
853                         GOTO(out, rc);
854         } else {
855                 GOTO(out, rc = -EINVAL);
856         }
857
858         cleanup = 3;
859
860         rc = echo_client_setup(env, obd, cfg);
861         if (rc)
862                 GOTO(out, rc);
863
864         ed->ed_ec = &obd->u.echo_client;
865         cleanup = 4;
866
867         if (ed->ed_next_ismd) {
868 #ifdef HAVE_SERVER_SUPPORT
869                 /* Suppose to connect to some Metadata layer */
870                 struct lu_site          *ls = NULL;
871                 struct lu_device        *ld = NULL;
872                 struct md_device        *md = NULL;
873                 struct echo_md_device   *emd = NULL;
874                 int                      found = 0;
875
876                 if (next == NULL) {
877                         CERROR("%s is not lu device type!\n",
878                                lustre_cfg_string(cfg, 1));
879                         GOTO(out, rc = -EINVAL);
880                 }
881
882                 tgt_type_name = lustre_cfg_string(cfg, 2);
883                 if (!tgt_type_name) {
884                         CERROR("%s no type name for echo %s setup\n",
885                                 lustre_cfg_string(cfg, 1),
886                                 tgt->obd_type->typ_name);
887                         GOTO(out, rc = -EINVAL);
888                 }
889
890                 ls = next->ld_site;
891
892                 spin_lock(&ls->ls_ld_lock);
893                 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
894                         if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
895                                 found = 1;
896                                 break;
897                         }
898                 }
899                 spin_unlock(&ls->ls_ld_lock);
900
901                 if (found == 0) {
902                         CERROR("%s is not lu device type!\n",
903                                lustre_cfg_string(cfg, 1));
904                         GOTO(out, rc = -EINVAL);
905                 }
906
907                 next = ld;
908                 /* For MD echo client, it will use the site in MDS stack */
909                 ed->ed_site = ls;
910                 ed->ed_cl.cd_lu_dev.ld_site = ls;
911                 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
912                 if (rc) {
913                         CERROR("echo fid init error %d\n", rc);
914                         GOTO(out, rc);
915                 }
916
917                 md = lu2md_dev(next);
918                 emd = lu2emd_dev(&md->md_lu_dev);
919                 rc = echo_md_root_get(env, emd, ed);
920                 if (rc != 0) {
921                         CERROR("%s: get root error: rc = %d\n",
922                                 emd2obd_dev(emd)->obd_name, rc);
923                         GOTO(out, rc);
924                 }
925 #else /* !HAVE_SERVER_SUPPORT */
926                 CERROR("Local operations are NOT supported on client side. "
927                        "Only remote operations are supported. Metadata client "
928                        "must be run on server side.\n");
929                 GOTO(out, rc = -EOPNOTSUPP);
930 #endif /* HAVE_SERVER_SUPPORT */
931         } else {
932                  /* if echo client is to be stacked upon ost device, the next is
933                   * NULL since ost is not a clio device so far */
934                 if (next != NULL && !lu_device_is_cl(next))
935                         next = NULL;
936
937                 tgt_type_name = tgt->obd_type->typ_name;
938                 if (next != NULL) {
939                         LASSERT(next != NULL);
940                         if (next->ld_site != NULL)
941                                 GOTO(out, rc = -EBUSY);
942
943                         next->ld_site = ed->ed_site;
944                         rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
945                                                      next->ld_type->ldt_name,
946                                                      NULL);
947                         if (rc)
948                                 GOTO(out, rc);
949                 } else
950                         LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
951         }
952
953         ed->ed_next = next;
954         RETURN(&cd->cd_lu_dev);
955 out:
956         switch(cleanup) {
957         case 4: {
958                 int rc2;
959                 rc2 = echo_client_cleanup(obd);
960                 if (rc2)
961                         CERROR("Cleanup obd device %s error(%d)\n",
962                                obd->obd_name, rc2);
963         }
964
965         case 3:
966                 echo_site_fini(env, ed);
967         case 2:
968                 cl_device_fini(&ed->ed_cl);
969         case 1:
970                 OBD_FREE_PTR(ed);
971         case 0:
972         default:
973                 break;
974         }
975         return(ERR_PTR(rc));
976 }
977
978 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
979                           const char *name, struct lu_device *next)
980 {
981         LBUG();
982         return 0;
983 }
984
985 static struct lu_device *echo_device_fini(const struct lu_env *env,
986                                           struct lu_device *d)
987 {
988         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
989         struct lu_device *next = ed->ed_next;
990
991         while (next && !ed->ed_next_ismd)
992                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
993         return NULL;
994 }
995
996 static void echo_lock_release(const struct lu_env *env,
997                               struct echo_lock *ecl,
998                               int still_used)
999 {
1000         struct cl_lock *clk = echo_lock2cl(ecl);
1001
1002         cl_lock_release(env, clk);
1003 }
1004
1005 static struct lu_device *echo_device_free(const struct lu_env *env,
1006                                           struct lu_device *d)
1007 {
1008         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
1009         struct echo_client_obd *ec   = ed->ed_ec;
1010         struct echo_object     *eco;
1011         struct lu_device       *next = ed->ed_next;
1012
1013         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
1014                ed, next);
1015
1016         lu_site_purge(env, ed->ed_site, -1);
1017
1018         /* check if there are objects still alive.
1019          * It shouldn't have any object because lu_site_purge would cleanup
1020          * all of cached objects. Anyway, probably the echo device is being
1021          * parallelly accessed.
1022          */
1023         spin_lock(&ec->ec_lock);
1024         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
1025                 eco->eo_deleted = 1;
1026         spin_unlock(&ec->ec_lock);
1027
1028         /* purge again */
1029         lu_site_purge(env, ed->ed_site, -1);
1030
1031         CDEBUG(D_INFO,
1032                "Waiting for the reference of echo object to be dropped\n");
1033
1034         /* Wait for the last reference to be dropped. */
1035         spin_lock(&ec->ec_lock);
1036         while (!list_empty(&ec->ec_objects)) {
1037                 spin_unlock(&ec->ec_lock);
1038                 CERROR("echo_client still has objects at cleanup time, "
1039                        "wait for 1 second\n");
1040                 set_current_state(TASK_UNINTERRUPTIBLE);
1041                 schedule_timeout(cfs_time_seconds(1));
1042                 lu_site_purge(env, ed->ed_site, -1);
1043                 spin_lock(&ec->ec_lock);
1044         }
1045         spin_unlock(&ec->ec_lock);
1046
1047         LASSERT(list_empty(&ec->ec_locks));
1048
1049         CDEBUG(D_INFO, "No object exists, exiting...\n");
1050
1051         echo_client_cleanup(d->ld_obd);
1052 #ifdef HAVE_SERVER_SUPPORT
1053         echo_fid_fini(d->ld_obd);
1054         echo_ed_los_fini(env, ed);
1055 #endif
1056         while (next && !ed->ed_next_ismd)
1057                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1058
1059         LASSERT(ed->ed_site == d->ld_site);
1060         echo_site_fini(env, ed);
1061         cl_device_fini(&ed->ed_cl);
1062         OBD_FREE_PTR(ed);
1063
1064         cl_env_cache_purge(~0);
1065
1066         return NULL;
1067 }
1068
1069 static const struct lu_device_type_operations echo_device_type_ops = {
1070         .ldto_init = echo_type_init,
1071         .ldto_fini = echo_type_fini,
1072
1073         .ldto_start = echo_type_start,
1074         .ldto_stop  = echo_type_stop,
1075
1076         .ldto_device_alloc = echo_device_alloc,
1077         .ldto_device_free  = echo_device_free,
1078         .ldto_device_init  = echo_device_init,
1079         .ldto_device_fini  = echo_device_fini
1080 };
1081
1082 static struct lu_device_type echo_device_type = {
1083         .ldt_tags     = LU_DEVICE_CL,
1084         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
1085         .ldt_ops      = &echo_device_type_ops,
1086         .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1087 };
1088 /** @} echo_init */
1089
1090 /** \defgroup echo_exports Exported operations
1091  *
1092  * exporting functions to echo client
1093  *
1094  * @{
1095  */
1096
1097 /* Interfaces to echo client obd device */
1098 static struct echo_object *
1099 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
1100 {
1101         struct lu_env *env;
1102         struct echo_thread_info *info;
1103         struct echo_object_conf *conf;
1104         struct echo_object *eco;
1105         struct cl_object *obj;
1106         struct lov_oinfo *oinfo = NULL;
1107         struct lu_fid *fid;
1108         __u16  refcheck;
1109         int rc;
1110         ENTRY;
1111
1112         LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
1113         LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
1114
1115         /* Never return an object if the obd is to be freed. */
1116         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1117                 RETURN(ERR_PTR(-ENODEV));
1118
1119         env = cl_env_get(&refcheck);
1120         if (IS_ERR(env))
1121                 RETURN((void *)env);
1122
1123         info = echo_env_info(env);
1124         conf = &info->eti_conf;
1125         if (d->ed_next) {
1126                 OBD_ALLOC_PTR(oinfo);
1127                 if (oinfo == NULL)
1128                         GOTO(out, eco = ERR_PTR(-ENOMEM));
1129
1130                 oinfo->loi_oi = *oi;
1131                 conf->eoc_cl.u.coc_oinfo = oinfo;
1132         }
1133
1134         /* If echo_object_init() is successful then ownership of oinfo
1135          * is transferred to the object. */
1136         conf->eoc_oinfo = &oinfo;
1137
1138         fid = &info->eti_fid;
1139         rc = ostid_to_fid(fid, oi, 0);
1140         if (rc != 0)
1141                 GOTO(out, eco = ERR_PTR(rc));
1142
1143         /* In the function below, .hs_keycmp resolves to
1144          * lu_obj_hop_keycmp() */
1145         /* coverity[overrun-buffer-val] */
1146         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1147         if (IS_ERR(obj))
1148                 GOTO(out, eco = (void*)obj);
1149
1150         eco = cl2echo_obj(obj);
1151         if (eco->eo_deleted) {
1152                 cl_object_put(env, obj);
1153                 eco = ERR_PTR(-EAGAIN);
1154         }
1155
1156 out:
1157         if (oinfo != NULL)
1158                 OBD_FREE_PTR(oinfo);
1159
1160         cl_env_put(env, &refcheck);
1161         RETURN(eco);
1162 }
1163
1164 static int cl_echo_object_put(struct echo_object *eco)
1165 {
1166         struct lu_env *env;
1167         struct cl_object *obj = echo_obj2cl(eco);
1168         __u16  refcheck;
1169         ENTRY;
1170
1171         env = cl_env_get(&refcheck);
1172         if (IS_ERR(env))
1173                 RETURN(PTR_ERR(env));
1174
1175         /* an external function to kill an object? */
1176         if (eco->eo_deleted) {
1177                 struct lu_object_header *loh = obj->co_lu.lo_header;
1178                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1179                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1180         }
1181
1182         cl_object_put(env, obj);
1183         cl_env_put(env, &refcheck);
1184         RETURN(0);
1185 }
1186
1187 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1188                             u64 start, u64 end, int mode,
1189                             __u64 *cookie , __u32 enqflags)
1190 {
1191         struct cl_io *io;
1192         struct cl_lock *lck;
1193         struct cl_object *obj;
1194         struct cl_lock_descr *descr;
1195         struct echo_thread_info *info;
1196         int rc = -ENOMEM;
1197         ENTRY;
1198
1199         info = echo_env_info(env);
1200         io = &info->eti_io;
1201         lck = &info->eti_lock;
1202         obj = echo_obj2cl(eco);
1203
1204         memset(lck, 0, sizeof(*lck));
1205         descr = &lck->cll_descr;
1206         descr->cld_obj   = obj;
1207         descr->cld_start = cl_index(obj, start);
1208         descr->cld_end   = cl_index(obj, end);
1209         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1210         descr->cld_enq_flags = enqflags;
1211         io->ci_obj = obj;
1212
1213         rc = cl_lock_request(env, io, lck);
1214         if (rc == 0) {
1215                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1216                 struct echo_lock *el;
1217
1218                 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1219                 spin_lock(&ec->ec_lock);
1220                 if (list_empty(&el->el_chain)) {
1221                         list_add(&el->el_chain, &ec->ec_locks);
1222                         el->el_cookie = ++ec->ec_unique;
1223                 }
1224                 atomic_inc(&el->el_refcount);
1225                 *cookie = el->el_cookie;
1226                 spin_unlock(&ec->ec_lock);
1227         }
1228         RETURN(rc);
1229 }
1230
1231 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1232                            __u64 cookie)
1233 {
1234         struct echo_client_obd *ec = ed->ed_ec;
1235         struct echo_lock       *ecl = NULL;
1236         struct list_head        *el;
1237         int found = 0, still_used = 0;
1238         ENTRY;
1239
1240         LASSERT(ec != NULL);
1241         spin_lock(&ec->ec_lock);
1242         list_for_each(el, &ec->ec_locks) {
1243                 ecl = list_entry(el, struct echo_lock, el_chain);
1244                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1245                 found = (ecl->el_cookie == cookie);
1246                 if (found) {
1247                         if (atomic_dec_and_test(&ecl->el_refcount))
1248                                 list_del_init(&ecl->el_chain);
1249                         else
1250                                 still_used = 1;
1251                         break;
1252                 }
1253         }
1254         spin_unlock(&ec->ec_lock);
1255
1256         if (!found)
1257                 RETURN(-ENOENT);
1258
1259         echo_lock_release(env, ecl, still_used);
1260         RETURN(0);
1261 }
1262
1263 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1264                                 struct cl_page *page)
1265 {
1266         struct echo_thread_info *info;
1267         struct cl_2queue        *queue;
1268
1269         info = echo_env_info(env);
1270         LASSERT(io == &info->eti_io);
1271
1272         queue = &info->eti_queue;
1273         cl_page_list_add(&queue->c2_qout, page);
1274 }
1275
1276 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1277                               struct page **pages, int npages, int async)
1278 {
1279         struct lu_env           *env;
1280         struct echo_thread_info *info;
1281         struct cl_object        *obj = echo_obj2cl(eco);
1282         struct echo_device      *ed  = eco->eo_dev;
1283         struct cl_2queue        *queue;
1284         struct cl_io            *io;
1285         struct cl_page          *clp;
1286         struct lustre_handle    lh = { 0 };
1287         int page_size = cl_page_size(obj);
1288         int rc;
1289         int i;
1290         __u16 refcheck;
1291         ENTRY;
1292
1293         LASSERT((offset & ~PAGE_MASK) == 0);
1294         LASSERT(ed->ed_next != NULL);
1295         env = cl_env_get(&refcheck);
1296         if (IS_ERR(env))
1297                 RETURN(PTR_ERR(env));
1298
1299         info    = echo_env_info(env);
1300         io      = &info->eti_io;
1301         queue   = &info->eti_queue;
1302
1303         cl_2queue_init(queue);
1304
1305         io->ci_ignore_layout = 1;
1306         rc = cl_io_init(env, io, CIT_MISC, obj);
1307         if (rc < 0)
1308                 GOTO(out, rc);
1309         LASSERT(rc == 0);
1310
1311
1312         rc = cl_echo_enqueue0(env, eco, offset,
1313                               offset + npages * PAGE_SIZE - 1,
1314                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1315                               CEF_NEVER);
1316         if (rc < 0)
1317                 GOTO(error_lock, rc);
1318
1319         for (i = 0; i < npages; i++) {
1320                 LASSERT(pages[i]);
1321                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1322                                    pages[i], CPT_TRANSIENT);
1323                 if (IS_ERR(clp)) {
1324                         rc = PTR_ERR(clp);
1325                         break;
1326                 }
1327                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1328
1329                 rc = cl_page_own(env, io, clp);
1330                 if (rc) {
1331                         LASSERT(clp->cp_state == CPS_FREEING);
1332                         cl_page_put(env, clp);
1333                         break;
1334                 }
1335
1336                 cl_2queue_add(queue, clp);
1337
1338                 /* drop the reference count for cl_page_find, so that the page
1339                  * will be freed in cl_2queue_fini. */
1340                 cl_page_put(env, clp);
1341                 cl_page_clip(env, clp, 0, page_size);
1342
1343                 offset += page_size;
1344         }
1345
1346         if (rc == 0) {
1347                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1348
1349                 async = async && (typ == CRT_WRITE);
1350                 if (async)
1351                         rc = cl_io_commit_async(env, io, &queue->c2_qin,
1352                                                 0, PAGE_SIZE,
1353                                                 echo_commit_callback);
1354                 else
1355                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1356                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1357                        async ? "async" : "sync", rc);
1358         }
1359
1360         cl_echo_cancel0(env, ed, lh.cookie);
1361         EXIT;
1362 error_lock:
1363         cl_2queue_discard(env, io, queue);
1364         cl_2queue_disown(env, io, queue);
1365         cl_2queue_fini(env, queue);
1366         cl_io_fini(env, io);
1367 out:
1368         cl_env_put(env, &refcheck);
1369         return rc;
1370 }
1371 /** @} echo_exports */
1372
1373
1374 static u64 last_object_id;
1375
1376 #ifdef HAVE_SERVER_SUPPORT
1377 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1378                                       __u64 id)
1379 {
1380         snprintf(name, ETI_NAME_LEN, "%llu", id);
1381         lname->ln_name = name;
1382         lname->ln_namelen = strlen(name);
1383 }
1384
1385 /* similar to mdt_attr_get_complex */
1386 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1387                             struct md_attr *ma)
1388 {
1389         struct echo_thread_info *info = echo_env_info(env);
1390         int                      rc;
1391
1392         ENTRY;
1393
1394         LASSERT(ma->ma_lmm_size > 0);
1395
1396         rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1397         if (rc < 0)
1398                 RETURN(rc);
1399
1400         /* big_lmm may need to be grown */
1401         if (info->eti_big_lmmsize < rc) {
1402                 int size = size_roundup_power2(rc);
1403
1404                 if (info->eti_big_lmmsize > 0) {
1405                         /* free old buffer */
1406                         LASSERT(info->eti_big_lmm);
1407                         OBD_FREE_LARGE(info->eti_big_lmm,
1408                                        info->eti_big_lmmsize);
1409                         info->eti_big_lmm = NULL;
1410                         info->eti_big_lmmsize = 0;
1411                 }
1412
1413                 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1414                 if (info->eti_big_lmm == NULL)
1415                         RETURN(-ENOMEM);
1416                 info->eti_big_lmmsize = size;
1417         }
1418         LASSERT(info->eti_big_lmmsize >= rc);
1419
1420         info->eti_buf.lb_buf = info->eti_big_lmm;
1421         info->eti_buf.lb_len = info->eti_big_lmmsize;
1422         rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1423         if (rc < 0)
1424                 RETURN(rc);
1425
1426         ma->ma_valid |= MA_LOV;
1427         ma->ma_lmm = info->eti_big_lmm;
1428         ma->ma_lmm_size = rc;
1429
1430         RETURN(0);
1431 }
1432
1433 static int echo_attr_get_complex(const struct lu_env *env,
1434                                  struct md_object *next,
1435                                  struct md_attr *ma)
1436 {
1437         struct echo_thread_info *info = echo_env_info(env);
1438         struct lu_buf           *buf = &info->eti_buf;
1439         umode_t          mode = lu_object_attr(&next->mo_lu);
1440         int                      need = ma->ma_need;
1441         int                      rc = 0, rc2;
1442
1443         ENTRY;
1444
1445         ma->ma_valid = 0;
1446
1447         if (need & MA_INODE) {
1448                 ma->ma_need = MA_INODE;
1449                 rc = mo_attr_get(env, next, ma);
1450                 if (rc)
1451                         GOTO(out, rc);
1452                 ma->ma_valid |= MA_INODE;
1453         }
1454
1455         if (need & MA_LOV) {
1456                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1457                         LASSERT(ma->ma_lmm_size > 0);
1458                         buf->lb_buf = ma->ma_lmm;
1459                         buf->lb_len = ma->ma_lmm_size;
1460                         rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1461                         if (rc2 > 0) {
1462                                 ma->ma_lmm_size = rc2;
1463                                 ma->ma_valid |= MA_LOV;
1464                         } else if (rc2 == -ENODATA) {
1465                                 /* no LOV EA */
1466                                 ma->ma_lmm_size = 0;
1467                         } else if (rc2 == -ERANGE) {
1468                                 rc2 = echo_big_lmm_get(env, next, ma);
1469                                 if (rc2 < 0)
1470                                         GOTO(out, rc = rc2);
1471                         } else {
1472                                 GOTO(out, rc = rc2);
1473                         }
1474                 }
1475         }
1476
1477 #ifdef CONFIG_FS_POSIX_ACL
1478         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1479                 buf->lb_buf = ma->ma_acl;
1480                 buf->lb_len = ma->ma_acl_size;
1481                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1482                 if (rc2 > 0) {
1483                         ma->ma_acl_size = rc2;
1484                         ma->ma_valid |= MA_ACL_DEF;
1485                 } else if (rc2 == -ENODATA) {
1486                         /* no ACLs */
1487                         ma->ma_acl_size = 0;
1488                 } else {
1489                         GOTO(out, rc = rc2);
1490                 }
1491         }
1492 #endif
1493 out:
1494         ma->ma_need = need;
1495         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1496                rc, ma->ma_valid, ma->ma_lmm);
1497         RETURN(rc);
1498 }
1499
1500 static int
1501 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1502                         struct md_object *parent, struct lu_fid *fid,
1503                         struct lu_name *lname, struct md_op_spec *spec,
1504                         struct md_attr *ma)
1505 {
1506         struct lu_object        *ec_child, *child;
1507         struct lu_device        *ld = ed->ed_next;
1508         struct echo_thread_info *info = echo_env_info(env);
1509         struct lu_fid           *fid2 = &info->eti_fid2;
1510         struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
1511         int                      rc;
1512
1513         ENTRY;
1514
1515         rc = mdo_lookup(env, parent, lname, fid2, spec);
1516         if (rc == 0)
1517                 return -EEXIST;
1518         else if (rc != -ENOENT)
1519                 return rc;
1520
1521         ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1522                                      fid, &conf);
1523         if (IS_ERR(ec_child)) {
1524                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1525                         PTR_ERR(ec_child));
1526                 RETURN(PTR_ERR(ec_child));
1527         }
1528
1529         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1530         if (child == NULL) {
1531                 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1532                 GOTO(out_put, rc = -EINVAL);
1533         }
1534
1535         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1536                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1537
1538         /*
1539          * Do not perform lookup sanity check. We know that name does not exist.
1540          */
1541         spec->sp_cr_lookup = 0;
1542         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1543         if (rc) {
1544                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1545                 GOTO(out_put, rc);
1546         }
1547         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
1548                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1549         EXIT;
1550 out_put:
1551         lu_object_put(env, ec_child);
1552         return rc;
1553 }
1554
1555 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1556                              struct md_attr *ma)
1557 {
1558         struct echo_thread_info *info = echo_env_info(env);
1559
1560         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1561                 ma->ma_lmm = (void *)&info->eti_lmm;
1562                 ma->ma_lmm_size = sizeof(info->eti_lmm);
1563         } else {
1564                 LASSERT(info->eti_big_lmmsize);
1565                 ma->ma_lmm = info->eti_big_lmm;
1566                 ma->ma_lmm_size = info->eti_big_lmmsize;
1567         }
1568
1569         return 0;
1570 }
1571
1572 static int echo_create_md_object(const struct lu_env *env,
1573                                  struct echo_device *ed,
1574                                  struct lu_object *ec_parent,
1575                                  struct lu_fid *fid,
1576                                  char *name, int namelen,
1577                                  __u64 id, __u32 mode, int count,
1578                                  int stripe_count, int stripe_offset)
1579 {
1580         struct lu_object        *parent;
1581         struct echo_thread_info *info = echo_env_info(env);
1582         struct lu_name          *lname = &info->eti_lname;
1583         struct md_op_spec       *spec = &info->eti_spec;
1584         struct md_attr          *ma = &info->eti_ma;
1585         struct lu_device        *ld = ed->ed_next;
1586         int                      rc = 0;
1587         int                      i;
1588
1589         ENTRY;
1590
1591         if (ec_parent == NULL)
1592                 return -1;
1593         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1594         if (parent == NULL)
1595                 RETURN(-ENXIO);
1596
1597         memset(ma, 0, sizeof(*ma));
1598         memset(spec, 0, sizeof(*spec));
1599         if (stripe_count != 0) {
1600                 spec->sp_cr_flags |= FMODE_WRITE;
1601                 echo_set_lmm_size(env, ld, ma);
1602                 if (stripe_count != -1) {
1603                         struct lov_user_md_v3 *lum = &info->eti_lum;
1604
1605                         lum->lmm_magic = LOV_USER_MAGIC_V3;
1606                         lum->lmm_stripe_count = stripe_count;
1607                         lum->lmm_stripe_offset = stripe_offset;
1608                         lum->lmm_pattern = 0;
1609                         spec->u.sp_ea.eadata = lum;
1610                         spec->u.sp_ea.eadatalen = sizeof(*lum);
1611                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1612                 }
1613         }
1614
1615         ma->ma_attr.la_mode = mode;
1616         ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1617         ma->ma_attr.la_ctime = cfs_time_current_64();
1618
1619         if (name != NULL) {
1620                 lname->ln_name = name;
1621                 lname->ln_namelen = namelen;
1622                 /* If name is specified, only create one object by name */
1623                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1624                                              spec, ma);
1625                 RETURN(rc);
1626         }
1627
1628         /* Create multiple object sequenced by id */
1629         for (i = 0; i < count; i++) {
1630                 char *tmp_name = info->eti_name;
1631
1632                 echo_md_build_name(lname, tmp_name, id);
1633
1634                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1635                                              spec, ma);
1636                 if (rc) {
1637                         CERROR("Can not create child %s: rc = %d\n", tmp_name,
1638                                 rc);
1639                         break;
1640                 }
1641                 id++;
1642                 fid->f_oid++;
1643         }
1644
1645         RETURN(rc);
1646 }
1647
1648 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1649                                         struct echo_device *ed,
1650                                         struct md_object *parent,
1651                                         struct lu_name *lname)
1652 {
1653         struct echo_thread_info *info = echo_env_info(env);
1654         struct lu_fid           *fid = &info->eti_fid;
1655         struct lu_object        *child;
1656         int    rc;
1657         ENTRY;
1658
1659         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1660                PFID(fid), parent);
1661         rc = mdo_lookup(env, parent, lname, fid, NULL);
1662         if (rc) {
1663                 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1664                 RETURN(ERR_PTR(rc));
1665         }
1666
1667         /* In the function below, .hs_keycmp resolves to
1668          * lu_obj_hop_keycmp() */
1669         /* coverity[overrun-buffer-val] */
1670         child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1671
1672         RETURN(child);
1673 }
1674
1675 static int echo_setattr_object(const struct lu_env *env,
1676                                struct echo_device *ed,
1677                                struct lu_object *ec_parent,
1678                                __u64 id, int count)
1679 {
1680         struct lu_object        *parent;
1681         struct echo_thread_info *info = echo_env_info(env);
1682         struct lu_name          *lname = &info->eti_lname;
1683         char                    *name = info->eti_name;
1684         struct lu_device        *ld = ed->ed_next;
1685         struct lu_buf           *buf = &info->eti_buf;
1686         int                      rc = 0;
1687         int                      i;
1688
1689         ENTRY;
1690
1691         if (ec_parent == NULL)
1692                 return -1;
1693         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1694         if (parent == NULL)
1695                 RETURN(-ENXIO);
1696
1697         for (i = 0; i < count; i++) {
1698                 struct lu_object *ec_child, *child;
1699
1700                 echo_md_build_name(lname, name, id);
1701
1702                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1703                 if (IS_ERR(ec_child)) {
1704                         CERROR("Can't find child %s: rc = %ld\n",
1705                                 lname->ln_name, PTR_ERR(ec_child));
1706                         RETURN(PTR_ERR(ec_child));
1707                 }
1708
1709                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1710                 if (child == NULL) {
1711                         CERROR("Can not locate the child %s\n", lname->ln_name);
1712                         lu_object_put(env, ec_child);
1713                         rc = -EINVAL;
1714                         break;
1715                 }
1716
1717                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1718                        PFID(lu_object_fid(child)));
1719
1720                 buf->lb_buf = info->eti_xattr_buf;
1721                 buf->lb_len = sizeof(info->eti_xattr_buf);
1722
1723                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1724                 rc = mo_xattr_set(env, lu2md(child), buf, name,
1725                                   LU_XATTR_CREATE);
1726                 if (rc < 0) {
1727                         CERROR("Can not setattr child "DFID": rc = %d\n",
1728                                 PFID(lu_object_fid(child)), rc);
1729                         lu_object_put(env, ec_child);
1730                         break;
1731                 }
1732                 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1733                        PFID(lu_object_fid(child)));
1734                 id++;
1735                 lu_object_put(env, ec_child);
1736         }
1737         RETURN(rc);
1738 }
1739
1740 static int echo_getattr_object(const struct lu_env *env,
1741                                struct echo_device *ed,
1742                                struct lu_object *ec_parent,
1743                                __u64 id, int count)
1744 {
1745         struct lu_object        *parent;
1746         struct echo_thread_info *info = echo_env_info(env);
1747         struct lu_name          *lname = &info->eti_lname;
1748         char                    *name = info->eti_name;
1749         struct md_attr          *ma = &info->eti_ma;
1750         struct lu_device        *ld = ed->ed_next;
1751         int                      rc = 0;
1752         int                      i;
1753
1754         ENTRY;
1755
1756         if (ec_parent == NULL)
1757                 return -1;
1758         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1759         if (parent == NULL)
1760                 RETURN(-ENXIO);
1761
1762         memset(ma, 0, sizeof(*ma));
1763         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1764         ma->ma_acl = info->eti_xattr_buf;
1765         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1766
1767         for (i = 0; i < count; i++) {
1768                 struct lu_object *ec_child, *child;
1769
1770                 ma->ma_valid = 0;
1771                 echo_md_build_name(lname, name, id);
1772                 echo_set_lmm_size(env, ld, ma);
1773
1774                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1775                 if (IS_ERR(ec_child)) {
1776                         CERROR("Can't find child %s: rc = %ld\n",
1777                                lname->ln_name, PTR_ERR(ec_child));
1778                         RETURN(PTR_ERR(ec_child));
1779                 }
1780
1781                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1782                 if (child == NULL) {
1783                         CERROR("Can not locate the child %s\n", lname->ln_name);
1784                         lu_object_put(env, ec_child);
1785                         RETURN(-EINVAL);
1786                 }
1787
1788                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1789                        PFID(lu_object_fid(child)));
1790                 rc = echo_attr_get_complex(env, lu2md(child), ma);
1791                 if (rc) {
1792                         CERROR("Can not getattr child "DFID": rc = %d\n",
1793                                 PFID(lu_object_fid(child)), rc);
1794                         lu_object_put(env, ec_child);
1795                         break;
1796                 }
1797                 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1798                        PFID(lu_object_fid(child)));
1799                 id++;
1800                 lu_object_put(env, ec_child);
1801         }
1802
1803         RETURN(rc);
1804 }
1805
1806 static int echo_lookup_object(const struct lu_env *env,
1807                               struct echo_device *ed,
1808                               struct lu_object *ec_parent,
1809                               __u64 id, int count)
1810 {
1811         struct lu_object        *parent;
1812         struct echo_thread_info *info = echo_env_info(env);
1813         struct lu_name          *lname = &info->eti_lname;
1814         char                    *name = info->eti_name;
1815         struct lu_fid           *fid = &info->eti_fid;
1816         struct lu_device        *ld = ed->ed_next;
1817         int                      rc = 0;
1818         int                      i;
1819
1820         if (ec_parent == NULL)
1821                 return -1;
1822         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1823         if (parent == NULL)
1824                 return -ENXIO;
1825
1826         /*prepare the requests*/
1827         for (i = 0; i < count; i++) {
1828                 echo_md_build_name(lname, name, id);
1829
1830                 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1831                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1832
1833                 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1834                 if (rc) {
1835                         CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1836                         break;
1837                 }
1838                 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1839                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1840
1841                 id++;
1842         }
1843         return rc;
1844 }
1845
1846 static int echo_md_destroy_internal(const struct lu_env *env,
1847                                     struct echo_device *ed,
1848                                     struct md_object *parent,
1849                                     struct lu_name *lname,
1850                                     struct md_attr *ma)
1851 {
1852         struct lu_device   *ld = ed->ed_next;
1853         struct lu_object   *ec_child;
1854         struct lu_object   *child;
1855         int                 rc;
1856
1857         ENTRY;
1858
1859         ec_child = echo_md_lookup(env, ed, parent, lname);
1860         if (IS_ERR(ec_child)) {
1861                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1862                         PTR_ERR(ec_child));
1863                 RETURN(PTR_ERR(ec_child));
1864         }
1865
1866         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1867         if (child == NULL) {
1868                 CERROR("Can not locate the child %s\n", lname->ln_name);
1869                 GOTO(out_put, rc = -EINVAL);
1870         }
1871
1872         if (lu_object_remote(child)) {
1873                 CERROR("Can not destroy remote object %s: rc = %d\n",
1874                        lname->ln_name, -EPERM);
1875                 GOTO(out_put, rc = -EPERM);
1876         }
1877         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1878                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1879
1880         rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1881         if (rc) {
1882                 CERROR("Can not unlink child %s: rc = %d\n",
1883                         lname->ln_name, rc);
1884                 GOTO(out_put, rc);
1885         }
1886         CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1887                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1888 out_put:
1889         lu_object_put(env, ec_child);
1890         return rc;
1891 }
1892
1893 static int echo_destroy_object(const struct lu_env *env,
1894                                struct echo_device *ed,
1895                                struct lu_object *ec_parent,
1896                                char *name, int namelen,
1897                                __u64 id, __u32 mode,
1898                                int count)
1899 {
1900         struct echo_thread_info *info = echo_env_info(env);
1901         struct lu_name          *lname = &info->eti_lname;
1902         struct md_attr          *ma = &info->eti_ma;
1903         struct lu_device        *ld = ed->ed_next;
1904         struct lu_object        *parent;
1905         int                      rc = 0;
1906         int                      i;
1907         ENTRY;
1908
1909         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1910         if (parent == NULL)
1911                 RETURN(-EINVAL);
1912
1913         memset(ma, 0, sizeof(*ma));
1914         ma->ma_attr.la_mode = mode;
1915         ma->ma_attr.la_valid = LA_CTIME;
1916         ma->ma_attr.la_ctime = cfs_time_current_64();
1917         ma->ma_need = MA_INODE;
1918         ma->ma_valid = 0;
1919
1920         if (name != NULL) {
1921                 lname->ln_name = name;
1922                 lname->ln_namelen = namelen;
1923                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1924                                               ma);
1925                 RETURN(rc);
1926         }
1927
1928         /*prepare the requests*/
1929         for (i = 0; i < count; i++) {
1930                 char *tmp_name = info->eti_name;
1931
1932                 ma->ma_valid = 0;
1933                 echo_md_build_name(lname, tmp_name, id);
1934
1935                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1936                                               ma);
1937                 if (rc) {
1938                         CERROR("Can not unlink child %s: rc = %d\n", name, rc);
1939                         break;
1940                 }
1941                 id++;
1942         }
1943
1944         RETURN(rc);
1945 }
1946
1947 static struct lu_object *echo_resolve_path(const struct lu_env *env,
1948                                            struct echo_device *ed, char *path,
1949                                            int path_len)
1950 {
1951         struct lu_device        *ld = ed->ed_next;
1952         struct echo_thread_info *info = echo_env_info(env);
1953         struct lu_fid           *fid = &info->eti_fid;
1954         struct lu_name          *lname = &info->eti_lname;
1955         struct lu_object        *parent = NULL;
1956         struct lu_object        *child = NULL;
1957         int                      rc = 0;
1958         ENTRY;
1959
1960         *fid = ed->ed_root_fid;
1961
1962         /* In the function below, .hs_keycmp resolves to
1963          * lu_obj_hop_keycmp() */
1964         /* coverity[overrun-buffer-val] */
1965         parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1966         if (IS_ERR(parent)) {
1967                 CERROR("Can not find the parent "DFID": rc = %ld\n",
1968                         PFID(fid), PTR_ERR(parent));
1969                 RETURN(parent);
1970         }
1971
1972         while (1) {
1973                 struct lu_object *ld_parent;
1974                 char *e;
1975
1976                 e = strsep(&path, "/");
1977                 if (e == NULL)
1978                         break;
1979
1980                 if (e[0] == 0) {
1981                         if (!path || path[0] == '\0')
1982                                 break;
1983                         continue;
1984                 }
1985
1986                 lname->ln_name = e;
1987                 lname->ln_namelen = strlen(e);
1988
1989                 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
1990                 if (ld_parent == NULL) {
1991                         lu_object_put(env, parent);
1992                         rc = -EINVAL;
1993                         break;
1994                 }
1995
1996                 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
1997                 lu_object_put(env, parent);
1998                 if (IS_ERR(child)) {
1999                         rc = (int)PTR_ERR(child);
2000                         CERROR("lookup %s under parent "DFID": rc = %d\n",
2001                                 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2002                                 rc);
2003                         break;
2004                 }
2005                 parent = child;
2006         }
2007         if (rc)
2008                 RETURN(ERR_PTR(rc));
2009
2010         RETURN(parent);
2011 }
2012
2013 static void echo_ucred_init(struct lu_env *env)
2014 {
2015         struct lu_ucred *ucred = lu_ucred(env);
2016
2017         ucred->uc_valid = UCRED_INVALID;
2018
2019         ucred->uc_suppgids[0] = -1;
2020         ucred->uc_suppgids[1] = -1;
2021
2022         ucred->uc_uid = ucred->uc_o_uid  =
2023                                 from_kuid(&init_user_ns, current_uid());
2024         ucred->uc_gid = ucred->uc_o_gid  =
2025                                 from_kgid(&init_user_ns, current_gid());
2026         ucred->uc_fsuid = ucred->uc_o_fsuid =
2027                                 from_kuid(&init_user_ns, current_fsuid());
2028         ucred->uc_fsgid = ucred->uc_o_fsgid =
2029                                 from_kgid(&init_user_ns, current_fsgid());
2030         ucred->uc_cap = cfs_curproc_cap_pack();
2031
2032         /* remove fs privilege for non-root user. */
2033         if (ucred->uc_fsuid)
2034                 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2035         ucred->uc_valid = UCRED_NEW;
2036 }
2037
2038 static void echo_ucred_fini(struct lu_env *env)
2039 {
2040         struct lu_ucred *ucred = lu_ucred(env);
2041         ucred->uc_valid = UCRED_INIT;
2042 }
2043
2044 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
2045 #define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
2046 static int echo_md_handler(struct echo_device *ed, int command,
2047                            char *path, int path_len, __u64 id, int count,
2048                            struct obd_ioctl_data *data)
2049 {
2050         struct echo_thread_info *info;
2051         struct lu_device      *ld = ed->ed_next;
2052         struct lu_env         *env;
2053         __u16                  refcheck;
2054         struct lu_object      *parent;
2055         char                  *name = NULL;
2056         int                    namelen = data->ioc_plen2;
2057         int                    rc = 0;
2058         ENTRY;
2059
2060         if (ld == NULL) {
2061                 CERROR("MD echo client is not being initialized properly\n");
2062                 RETURN(-EINVAL);
2063         }
2064
2065         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2066                 CERROR("Only support MDD layer right now!\n");
2067                 RETURN(-EINVAL);
2068         }
2069
2070         env = cl_env_get(&refcheck);
2071         if (IS_ERR(env))
2072                 RETURN(PTR_ERR(env));
2073
2074         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
2075         if (rc != 0)
2076                 GOTO(out_env, rc);
2077
2078         /* init big_lmm buffer */
2079         info = echo_env_info(env);
2080         LASSERT(info->eti_big_lmm == NULL);
2081         OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2082         if (info->eti_big_lmm == NULL)
2083                 GOTO(out_env, rc = -ENOMEM);
2084         info->eti_big_lmmsize = MIN_MD_SIZE;
2085
2086         parent = echo_resolve_path(env, ed, path, path_len);
2087         if (IS_ERR(parent)) {
2088                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2089                         PTR_ERR(parent));
2090                 GOTO(out_free, rc = PTR_ERR(parent));
2091         }
2092
2093         if (namelen > 0) {
2094                 OBD_ALLOC(name, namelen + 1);
2095                 if (name == NULL)
2096                         GOTO(out_put, rc = -ENOMEM);
2097                 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2098                         GOTO(out_name, rc = -EFAULT);
2099         }
2100
2101         echo_ucred_init(env);
2102
2103         switch (command) {
2104         case ECHO_MD_CREATE:
2105         case ECHO_MD_MKDIR: {
2106                 struct echo_thread_info *info = echo_env_info(env);
2107                 __u32 mode = data->ioc_obdo2.o_mode;
2108                 struct lu_fid *fid = &info->eti_fid;
2109                 int stripe_count = (int)data->ioc_obdo2.o_misc;
2110                 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2111
2112                 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2113                 if (rc != 0)
2114                         break;
2115
2116                 /* In the function below, .hs_keycmp resolves to
2117                  * lu_obj_hop_keycmp() */
2118                 /* coverity[overrun-buffer-val] */
2119                 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2120                                            id, mode, count, stripe_count,
2121                                            stripe_index);
2122                 break;
2123         }
2124         case ECHO_MD_DESTROY:
2125         case ECHO_MD_RMDIR: {
2126                 __u32 mode = data->ioc_obdo2.o_mode;
2127
2128                 rc = echo_destroy_object(env, ed, parent, name, namelen,
2129                                          id, mode, count);
2130                 break;
2131         }
2132         case ECHO_MD_LOOKUP:
2133                 rc = echo_lookup_object(env, ed, parent, id, count);
2134                 break;
2135         case ECHO_MD_GETATTR:
2136                 rc = echo_getattr_object(env, ed, parent, id, count);
2137                 break;
2138         case ECHO_MD_SETATTR:
2139                 rc = echo_setattr_object(env, ed, parent, id, count);
2140                 break;
2141         default:
2142                 CERROR("unknown command %d\n", command);
2143                 rc = -EINVAL;
2144                 break;
2145         }
2146         echo_ucred_fini(env);
2147
2148 out_name:
2149         if (name != NULL)
2150                 OBD_FREE(name, namelen + 1);
2151 out_put:
2152         lu_object_put(env, parent);
2153 out_free:
2154         LASSERT(info->eti_big_lmm);
2155         OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2156         info->eti_big_lmm = NULL;
2157         info->eti_big_lmmsize = 0;
2158 out_env:
2159         cl_env_put(env, &refcheck);
2160         return rc;
2161 }
2162 #endif /* HAVE_SERVER_SUPPORT */
2163
2164 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2165                               struct obdo *oa)
2166 {
2167         struct echo_object      *eco;
2168         struct echo_client_obd  *ec = ed->ed_ec;
2169         int created = 0;
2170         int rc;
2171         ENTRY;
2172
2173         if (!(oa->o_valid & OBD_MD_FLID) ||
2174             !(oa->o_valid & OBD_MD_FLGROUP) ||
2175             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
2176                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2177                 RETURN(-EINVAL);
2178         }
2179
2180         if (ostid_id(&oa->o_oi) == 0) {
2181                 rc = ostid_set_id(&oa->o_oi, ++last_object_id);
2182                 if (rc)
2183                         GOTO(failed, rc);
2184         }
2185
2186         rc = obd_create(env, ec->ec_exp, oa);
2187         if (rc != 0) {
2188                 CERROR("Cannot create objects: rc = %d\n", rc);
2189                 GOTO(failed, rc);
2190         }
2191
2192         created = 1;
2193
2194         oa->o_valid |= OBD_MD_FLID;
2195
2196         eco = cl_echo_object_find(ed, &oa->o_oi);
2197         if (IS_ERR(eco))
2198                 GOTO(failed, rc = PTR_ERR(eco));
2199         cl_echo_object_put(eco);
2200
2201         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2202         EXIT;
2203
2204 failed:
2205         if (created && rc != 0)
2206                 obd_destroy(env, ec->ec_exp, oa);
2207
2208         if (rc != 0)
2209                 CERROR("create object failed with: rc = %d\n", rc);
2210
2211         return rc;
2212 }
2213
2214 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2215                            struct obdo *oa)
2216 {
2217         struct echo_object *eco;
2218         int rc;
2219         ENTRY;
2220
2221         if (!(oa->o_valid & OBD_MD_FLID) ||
2222             !(oa->o_valid & OBD_MD_FLGROUP) ||
2223             ostid_id(&oa->o_oi) == 0) {
2224                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2225                 RETURN(-EINVAL);
2226         }
2227
2228         rc = 0;
2229         eco = cl_echo_object_find(ed, &oa->o_oi);
2230         if (!IS_ERR(eco))
2231                 *ecop = eco;
2232         else
2233                 rc = PTR_ERR(eco);
2234
2235         RETURN(rc);
2236 }
2237
2238 static void echo_put_object(struct echo_object *eco)
2239 {
2240         int rc;
2241
2242         rc = cl_echo_object_put(eco);
2243         if (rc)
2244                 CERROR("%s: echo client drop an object failed: rc = %d\n",
2245                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2246 }
2247
2248 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2249                                          u64 offset, u64 count)
2250 {
2251         char    *addr;
2252         u64      stripe_off;
2253         u64      stripe_id;
2254         int      delta;
2255
2256         /* no partial pages on the client */
2257         LASSERT(count == PAGE_SIZE);
2258
2259         addr = kmap(page);
2260
2261         for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2262                 if (rw == OBD_BRW_WRITE) {
2263                         stripe_off = offset + delta;
2264                         stripe_id = id;
2265                 } else {
2266                         stripe_off = 0xdeadbeef00c0ffeeULL;
2267                         stripe_id = 0xdeadbeef00c0ffeeULL;
2268                 }
2269                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2270                                   stripe_off, stripe_id);
2271         }
2272
2273         kunmap(page);
2274 }
2275
2276 static int
2277 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2278 {
2279         u64      stripe_off;
2280         u64      stripe_id;
2281         char   *addr;
2282         int     delta;
2283         int     rc;
2284         int     rc2;
2285
2286         /* no partial pages on the client */
2287         LASSERT(count == PAGE_SIZE);
2288
2289         addr = kmap(page);
2290
2291         for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2292                 stripe_off = offset + delta;
2293                 stripe_id = id;
2294
2295                 rc2 = block_debug_check("test_brw",
2296                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
2297                                         stripe_off, stripe_id);
2298                 if (rc2 != 0) {
2299                         CERROR("Error in echo object %#llx\n", id);
2300                         rc = rc2;
2301                 }
2302         }
2303
2304         kunmap(page);
2305         return rc;
2306 }
2307
2308 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2309                             struct echo_object *eco, u64 offset,
2310                             u64 count, int async)
2311 {
2312         size_t                  npages;
2313         struct brw_page        *pga;
2314         struct brw_page        *pgp;
2315         struct page            **pages;
2316         u64                      off;
2317         size_t                  i;
2318         int                     rc;
2319         int                     verify;
2320         gfp_t                   gfp_mask;
2321         u32                     brw_flags = 0;
2322         ENTRY;
2323
2324         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2325                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2326                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2327
2328         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
2329
2330         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2331
2332         if ((count & (~PAGE_MASK)) != 0)
2333                 RETURN(-EINVAL);
2334
2335         /* XXX think again with misaligned I/O */
2336         npages = count >> PAGE_SHIFT;
2337
2338         if (rw == OBD_BRW_WRITE)
2339                 brw_flags = OBD_BRW_ASYNC;
2340
2341         OBD_ALLOC(pga, npages * sizeof(*pga));
2342         if (pga == NULL)
2343                 RETURN(-ENOMEM);
2344
2345         OBD_ALLOC(pages, npages * sizeof(*pages));
2346         if (pages == NULL) {
2347                 OBD_FREE(pga, npages * sizeof(*pga));
2348                 RETURN(-ENOMEM);
2349         }
2350
2351         for (i = 0, pgp = pga, off = offset;
2352              i < npages;
2353              i++, pgp++, off += PAGE_SIZE) {
2354
2355                 LASSERT(pgp->pg == NULL);       /* for cleanup */
2356
2357                 rc = -ENOMEM;
2358                 pgp->pg = alloc_page(gfp_mask);
2359                 if (pgp->pg == NULL)
2360                         goto out;
2361
2362                 pages[i] = pgp->pg;
2363                 pgp->count = PAGE_SIZE;
2364                 pgp->off = off;
2365                 pgp->flag = brw_flags;
2366
2367                 if (verify)
2368                         echo_client_page_debug_setup(pgp->pg, rw,
2369                                                      ostid_id(&oa->o_oi), off,
2370                                                      pgp->count);
2371         }
2372
2373         /* brw mode can only be used at client */
2374         LASSERT(ed->ed_next != NULL);
2375         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2376
2377  out:
2378         if (rc != 0 || rw != OBD_BRW_READ)
2379                 verify = 0;
2380
2381         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2382                 if (pgp->pg == NULL)
2383                         continue;
2384
2385                 if (verify) {
2386                         int vrc;
2387                         vrc = echo_client_page_debug_check(pgp->pg,
2388                                                            ostid_id(&oa->o_oi),
2389                                                            pgp->off, pgp->count);
2390                         if (vrc != 0 && rc == 0)
2391                                 rc = vrc;
2392                 }
2393                 __free_page(pgp->pg);
2394         }
2395         OBD_FREE(pga, npages * sizeof(*pga));
2396         OBD_FREE(pages, npages * sizeof(*pages));
2397         RETURN(rc);
2398 }
2399
2400 static int echo_client_prep_commit(const struct lu_env *env,
2401                                    struct obd_export *exp, int rw,
2402                                    struct obdo *oa, struct echo_object *eco,
2403                                    u64 offset, u64 count,
2404                                    u64 batch, int async)
2405 {
2406         struct obd_ioobj         ioo;
2407         struct niobuf_local     *lnb;
2408         struct niobuf_remote     rnb;
2409         u64                      off;
2410         u64                      npages, tot_pages, apc;
2411         int i, ret = 0, brw_flags = 0;
2412
2413         ENTRY;
2414
2415         if (count <= 0 || (count & ~PAGE_MASK) != 0)
2416                 RETURN(-EINVAL);
2417
2418         apc = npages = batch >> PAGE_SHIFT;
2419         tot_pages = count >> PAGE_SHIFT;
2420
2421         OBD_ALLOC(lnb, apc * sizeof(struct niobuf_local));
2422         if (lnb == NULL)
2423                 RETURN(-ENOMEM);
2424
2425         if (rw == OBD_BRW_WRITE && async)
2426                 brw_flags |= OBD_BRW_ASYNC;
2427
2428         obdo_to_ioobj(oa, &ioo);
2429
2430         off = offset;
2431
2432         for (; tot_pages > 0; tot_pages -= npages) {
2433                 int lpages;
2434
2435                 if (tot_pages < npages)
2436                         npages = tot_pages;
2437
2438                 rnb.rnb_offset = off;
2439                 rnb.rnb_len = npages * PAGE_SIZE;
2440                 rnb.rnb_flags = brw_flags;
2441                 ioo.ioo_bufcnt = 1;
2442                 off += npages * PAGE_SIZE;
2443
2444                 lpages = npages;
2445                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
2446                 if (ret != 0)
2447                         GOTO(out, ret);
2448
2449                 for (i = 0; i < lpages; i++) {
2450                         struct page *page = lnb[i].lnb_page;
2451
2452                         /* read past eof? */
2453                         if (page == NULL && lnb[i].lnb_rc == 0)
2454                                 continue;
2455
2456                         if (async)
2457                                 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2458
2459                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2460                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2461                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2462                                 continue;
2463
2464                         if (rw == OBD_BRW_WRITE)
2465                                 echo_client_page_debug_setup(page, rw,
2466                                                         ostid_id(&oa->o_oi),
2467                                                         lnb[i].lnb_file_offset,
2468                                                         lnb[i].lnb_len);
2469                         else
2470                                 echo_client_page_debug_check(page,
2471                                                         ostid_id(&oa->o_oi),
2472                                                         lnb[i].lnb_file_offset,
2473                                                         lnb[i].lnb_len);
2474                 }
2475
2476                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
2477                                    ret);
2478                 if (ret != 0)
2479                         break;
2480
2481                 /* Reuse env context. */
2482                 lu_context_exit((struct lu_context *)&env->le_ctx);
2483                 lu_context_enter((struct lu_context *)&env->le_ctx);
2484         }
2485
2486 out:
2487         OBD_FREE(lnb, apc * sizeof(struct niobuf_local));
2488
2489         RETURN(ret);
2490 }
2491
2492 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2493                                  struct obd_export *exp,
2494                                  struct obd_ioctl_data *data)
2495 {
2496         struct obd_device *obd = class_exp2obd(exp);
2497         struct echo_device *ed = obd2echo_dev(obd);
2498         struct echo_client_obd *ec = ed->ed_ec;
2499         struct obdo *oa = &data->ioc_obdo1;
2500         struct echo_object *eco;
2501         int rc;
2502         int async = 0;
2503         long test_mode;
2504         ENTRY;
2505
2506         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2507
2508         rc = echo_get_object(&eco, ed, oa);
2509         if (rc)
2510                 RETURN(rc);
2511
2512         oa->o_valid &= ~OBD_MD_FLHANDLE;
2513
2514         /* OFD/obdfilter works only via prep/commit */
2515         test_mode = (long)data->ioc_pbuf1;
2516         if (ed->ed_next == NULL && test_mode != 3) {
2517                 test_mode = 3;
2518                 data->ioc_plen1 = data->ioc_count;
2519         }
2520
2521         if (test_mode == 3)
2522                 async = 1;
2523
2524         /* Truncate batch size to maximum */
2525         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2526                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2527
2528         switch (test_mode) {
2529         case 1:
2530                 /* fall through */
2531         case 2:
2532                 rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
2533                                       data->ioc_count, async);
2534                 break;
2535         case 3:
2536                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
2537                                              data->ioc_offset, data->ioc_count,
2538                                              data->ioc_plen1, async);
2539                 break;
2540         default:
2541                 rc = -EINVAL;
2542         }
2543
2544         echo_put_object(eco);
2545
2546         RETURN(rc);
2547 }
2548
2549 static int
2550 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2551                       void *karg, void __user *uarg)
2552 {
2553 #ifdef HAVE_SERVER_SUPPORT
2554         struct tgt_session_info *tsi;
2555 #endif
2556         struct obd_device      *obd = exp->exp_obd;
2557         struct echo_device     *ed = obd2echo_dev(obd);
2558         struct echo_client_obd *ec = ed->ed_ec;
2559         struct echo_object     *eco;
2560         struct obd_ioctl_data  *data = karg;
2561         struct lu_env          *env;
2562         struct obdo            *oa;
2563         struct lu_fid           fid;
2564         int                     rw = OBD_BRW_READ;
2565         int                     rc = 0;
2566 #ifdef HAVE_SERVER_SUPPORT
2567         struct lu_context        echo_session;
2568 #endif
2569         ENTRY;
2570
2571         oa = &data->ioc_obdo1;
2572         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2573                 oa->o_valid |= OBD_MD_FLGROUP;
2574                 ostid_set_seq_echo(&oa->o_oi);
2575         }
2576
2577         /* This FID is unpacked just for validation at this point */
2578         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2579         if (rc < 0)
2580                 RETURN(rc);
2581
2582         OBD_ALLOC_PTR(env);
2583         if (env == NULL)
2584                 RETURN(-ENOMEM);
2585
2586         rc = lu_env_init(env, LCT_DT_THREAD);
2587         if (rc)
2588                 GOTO(out_alloc, rc = -ENOMEM);
2589
2590 #ifdef HAVE_SERVER_SUPPORT
2591         env->le_ses = &echo_session;
2592         rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
2593         if (unlikely(rc < 0))
2594                 GOTO(out_env, rc);
2595         lu_context_enter(env->le_ses);
2596
2597         tsi = tgt_ses_info(env);
2598         tsi->tsi_exp = ec->ec_exp;
2599         tsi->tsi_jobid = NULL;
2600 #endif
2601         switch (cmd) {
2602         case OBD_IOC_CREATE:                    /* may create echo object */
2603                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2604                         GOTO (out, rc = -EPERM);
2605
2606                 rc = echo_create_object(env, ed, oa);
2607                 GOTO(out, rc);
2608
2609 #ifdef HAVE_SERVER_SUPPORT
2610         case OBD_IOC_ECHO_MD: {
2611                 int count;
2612                 int cmd;
2613                 char *dir = NULL;
2614                 int dirlen;
2615                 __u64 id;
2616
2617                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2618                         GOTO(out, rc = -EPERM);
2619
2620                 count = data->ioc_count;
2621                 cmd = data->ioc_command;
2622
2623                 id = data->ioc_obdo2.o_oi.oi.oi_id;
2624                 dirlen = data->ioc_plen1;
2625                 OBD_ALLOC(dir, dirlen + 1);
2626                 if (dir == NULL)
2627                         GOTO(out, rc = -ENOMEM);
2628
2629                 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2630                         OBD_FREE(dir, data->ioc_plen1 + 1);
2631                         GOTO(out, rc = -EFAULT);
2632                 }
2633
2634                 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2635                 OBD_FREE(dir, dirlen + 1);
2636                 GOTO(out, rc);
2637         }
2638         case OBD_IOC_ECHO_ALLOC_SEQ: {
2639                 struct lu_env   *cl_env;
2640                 __u16            refcheck;
2641                 __u64            seq;
2642                 int              max_count;
2643
2644                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2645                         GOTO(out, rc = -EPERM);
2646
2647                 cl_env = cl_env_get(&refcheck);
2648                 if (IS_ERR(cl_env))
2649                         GOTO(out, rc = PTR_ERR(cl_env));
2650
2651                 rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
2652                                             ECHO_MD_SES_TAG);
2653                 if (rc != 0) {
2654                         cl_env_put(cl_env, &refcheck);
2655                         GOTO(out, rc);
2656                 }
2657
2658                 rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
2659                 cl_env_put(cl_env, &refcheck);
2660                 if (rc < 0) {
2661                         CERROR("%s: Can not alloc seq: rc = %d\n",
2662                                obd->obd_name, rc);
2663                         GOTO(out, rc);
2664                 }
2665
2666                 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2667                         return -EFAULT;
2668
2669                 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2670                 if (copy_to_user(data->ioc_pbuf2, &max_count,
2671                                      data->ioc_plen2))
2672                         return -EFAULT;
2673                 GOTO(out, rc);
2674         }
2675 #endif /* HAVE_SERVER_SUPPORT */
2676         case OBD_IOC_DESTROY:
2677                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2678                         GOTO (out, rc = -EPERM);
2679
2680                 rc = echo_get_object(&eco, ed, oa);
2681                 if (rc == 0) {
2682                         rc = obd_destroy(env, ec->ec_exp, oa);
2683                         if (rc == 0)
2684                                 eco->eo_deleted = 1;
2685                         echo_put_object(eco);
2686                 }
2687                 GOTO(out, rc);
2688
2689         case OBD_IOC_GETATTR:
2690                 rc = echo_get_object(&eco, ed, oa);
2691                 if (rc == 0) {
2692                         rc = obd_getattr(env, ec->ec_exp, oa);
2693                         echo_put_object(eco);
2694                 }
2695                 GOTO(out, rc);
2696
2697         case OBD_IOC_SETATTR:
2698                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2699                         GOTO (out, rc = -EPERM);
2700
2701                 rc = echo_get_object(&eco, ed, oa);
2702                 if (rc == 0) {
2703                         rc = obd_setattr(env, ec->ec_exp, oa);
2704                         echo_put_object(eco);
2705                 }
2706                 GOTO(out, rc);
2707
2708         case OBD_IOC_BRW_WRITE:
2709                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2710                         GOTO (out, rc = -EPERM);
2711
2712                 rw = OBD_BRW_WRITE;
2713                 /* fall through */
2714         case OBD_IOC_BRW_READ:
2715                 rc = echo_client_brw_ioctl(env, rw, exp, data);
2716                 GOTO(out, rc);
2717
2718         default:
2719                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2720                 GOTO (out, rc = -ENOTTY);
2721         }
2722
2723         EXIT;
2724 out:
2725 #ifdef HAVE_SERVER_SUPPORT
2726         lu_context_exit(env->le_ses);
2727         lu_context_fini(env->le_ses);
2728 out_env:
2729 #endif
2730         lu_env_fini(env);
2731 out_alloc:
2732         OBD_FREE_PTR(env);
2733
2734         return rc;
2735 }
2736
2737 static int echo_client_setup(const struct lu_env *env,
2738                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2739 {
2740         struct echo_client_obd *ec = &obddev->u.echo_client;
2741         struct obd_device *tgt;
2742         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2743         struct obd_connect_data *ocd = NULL;
2744         int rc;
2745         ENTRY;
2746
2747         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2748                 CERROR("requires a TARGET OBD name\n");
2749                 RETURN(-EINVAL);
2750         }
2751
2752         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2753         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2754                 CERROR("device not attached or not set up (%s)\n",
2755                        lustre_cfg_string(lcfg, 1));
2756                 RETURN(-EINVAL);
2757         }
2758
2759         spin_lock_init(&ec->ec_lock);
2760         INIT_LIST_HEAD(&ec->ec_objects);
2761         INIT_LIST_HEAD(&ec->ec_locks);
2762         ec->ec_unique = 0;
2763
2764         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2765 #ifdef HAVE_SERVER_SUPPORT
2766                 lu_context_tags_update(ECHO_MD_CTX_TAG);
2767                 lu_session_tags_update(ECHO_MD_SES_TAG);
2768 #else
2769                 CERROR("Local operations are NOT supported on client side. "
2770                        "Only remote operations are supported. Metadata client "
2771                        "must be run on server side.\n");
2772 #endif
2773                 RETURN(0);
2774         }
2775
2776         OBD_ALLOC(ocd, sizeof(*ocd));
2777         if (ocd == NULL) {
2778                 CERROR("Can't alloc ocd connecting to %s\n",
2779                        lustre_cfg_string(lcfg, 1));
2780                 return -ENOMEM;
2781         }
2782
2783         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2784                                  OBD_CONNECT_BRW_SIZE |
2785                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2786                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2787                                  OBD_CONNECT_FID;
2788         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2789         ocd->ocd_version = LUSTRE_VERSION_CODE;
2790         ocd->ocd_group = FID_SEQ_ECHO;
2791
2792         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2793         if (rc == 0) {
2794                 /* Turn off pinger because it connects to tgt obd directly. */
2795                 spin_lock(&tgt->obd_dev_lock);
2796                 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2797                 spin_unlock(&tgt->obd_dev_lock);
2798         }
2799
2800         OBD_FREE(ocd, sizeof(*ocd));
2801
2802         if (rc != 0) {
2803                 CERROR("fail to connect to device %s\n",
2804                        lustre_cfg_string(lcfg, 1));
2805                 return (rc);
2806         }
2807
2808         RETURN(rc);
2809 }
2810
2811 static int echo_client_cleanup(struct obd_device *obddev)
2812 {
2813         struct echo_device *ed = obd2echo_dev(obddev);
2814         struct echo_client_obd *ec = &obddev->u.echo_client;
2815         int rc;
2816         ENTRY;
2817
2818         /*Do nothing for Metadata echo client*/
2819         if (ed == NULL )
2820                 RETURN(0);
2821
2822         if (ed->ed_next_ismd) {
2823 #ifdef HAVE_SERVER_SUPPORT
2824                 lu_context_tags_clear(ECHO_MD_CTX_TAG);
2825                 lu_session_tags_clear(ECHO_MD_SES_TAG);
2826 #else
2827                 CERROR("This is client-side only module, does not support "
2828                         "metadata echo client.\n");
2829 #endif
2830                 RETURN(0);
2831         }
2832
2833         if (!list_empty(&obddev->obd_exports)) {
2834                 CERROR("still has clients!\n");
2835                 RETURN(-EBUSY);
2836         }
2837
2838         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2839         rc = obd_disconnect(ec->ec_exp);
2840         if (rc != 0)
2841                 CERROR("fail to disconnect device: %d\n", rc);
2842
2843         RETURN(rc);
2844 }
2845
2846 static int echo_client_connect(const struct lu_env *env,
2847                                struct obd_export **exp,
2848                                struct obd_device *src, struct obd_uuid *cluuid,
2849                                struct obd_connect_data *data, void *localdata)
2850 {
2851         int                rc;
2852         struct lustre_handle conn = { 0 };
2853
2854         ENTRY;
2855         rc = class_connect(&conn, src, cluuid);
2856         if (rc == 0) {
2857                 *exp = class_conn2export(&conn);
2858         }
2859
2860         RETURN (rc);
2861 }
2862
2863 static int echo_client_disconnect(struct obd_export *exp)
2864 {
2865         int                     rc;
2866         ENTRY;
2867
2868         if (exp == NULL)
2869                 GOTO(out, rc = -EINVAL);
2870
2871         rc = class_disconnect(exp);
2872         GOTO(out, rc);
2873  out:
2874         return rc;
2875 }
2876
2877 static struct obd_ops echo_client_obd_ops = {
2878         .o_owner       = THIS_MODULE,
2879         .o_iocontrol   = echo_client_iocontrol,
2880         .o_connect     = echo_client_connect,
2881         .o_disconnect  = echo_client_disconnect
2882 };
2883
2884 static int __init obdecho_init(void)
2885 {
2886         int rc;
2887
2888         ENTRY;
2889         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2890
2891         LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2892
2893 # ifdef HAVE_SERVER_SUPPORT
2894         rc = echo_persistent_pages_init();
2895         if (rc != 0)
2896                 goto failed_0;
2897
2898         rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
2899                                  LUSTRE_ECHO_NAME, NULL);
2900         if (rc != 0)
2901                 goto failed_1;
2902 # endif
2903
2904         rc = lu_kmem_init(echo_caches);
2905         if (rc == 0) {
2906                 rc = class_register_type(&echo_client_obd_ops, NULL, true, NULL,
2907                                          LUSTRE_ECHO_CLIENT_NAME,
2908                                          &echo_device_type);
2909                 if (rc)
2910                         lu_kmem_fini(echo_caches);
2911         }
2912
2913 # ifdef HAVE_SERVER_SUPPORT
2914         if (rc == 0)
2915                 RETURN(0);
2916
2917         class_unregister_type(LUSTRE_ECHO_NAME);
2918 failed_1:
2919         echo_persistent_pages_fini();
2920 failed_0:
2921 # endif
2922         RETURN(rc);
2923 }
2924
2925 static void __exit obdecho_exit(void)
2926 {
2927         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2928         lu_kmem_fini(echo_caches);
2929
2930 #ifdef HAVE_SERVER_SUPPORT
2931         class_unregister_type(LUSTRE_ECHO_NAME);
2932         echo_persistent_pages_fini();
2933 #endif
2934 }
2935
2936 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2937 MODULE_DESCRIPTION("Lustre Echo Client test driver");
2938 MODULE_VERSION(LUSTRE_VERSION_STRING);
2939 MODULE_LICENSE("GPL");
2940
2941 module_init(obdecho_init);
2942 module_exit(obdecho_exit);
2943
2944 /** @} echo_client */