Whamcloud - gitweb
LU-6245 libcfs: remove mem wrappers for libcfs
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include <libcfs/libcfs.h>
39
40 #include <obd.h>
41 #include <obd_support.h>
42 #include <obd_class.h>
43 #include <lustre_debug.h>
44 #include <lprocfs_status.h>
45 #include <cl_object.h>
46 #include <lustre_fid.h>
47 #include <lustre_acl.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #ifdef HAVE_SERVER_SUPPORT
51 # include <md_object.h>
52 #endif /* HAVE_SERVER_SUPPORT */
53
54 #include "echo_internal.h"
55
56 /** \defgroup echo_client Echo Client
57  * @{
58  */
59
60 struct echo_device {
61         struct cl_device          ed_cl;
62         struct echo_client_obd   *ed_ec;
63
64         struct cl_site            ed_site_myself;
65         struct cl_site           *ed_site;
66         struct lu_device         *ed_next;
67         int                       ed_next_ismd;
68         struct lu_client_seq     *ed_cl_seq;
69 #ifdef HAVE_SERVER_SUPPORT
70         struct local_oid_storage *ed_los;
71         struct lu_fid             ed_root_fid;
72 #endif /* HAVE_SERVER_SUPPORT */
73 };
74
75 struct echo_object {
76         struct cl_object        eo_cl;
77         struct cl_object_header eo_hdr;
78         struct echo_device     *eo_dev;
79         struct list_head        eo_obj_chain;
80         struct lov_oinfo       *eo_oinfo;
81         atomic_t                eo_npages;
82         int                     eo_deleted;
83 };
84
85 struct echo_object_conf {
86         struct cl_object_conf   eoc_cl;
87         struct lov_oinfo      **eoc_oinfo;
88 };
89
90 struct echo_page {
91         struct cl_page_slice    ep_cl;
92         struct mutex            ep_lock;
93 };
94
95 struct echo_lock {
96         struct cl_lock_slice    el_cl;
97         struct list_head        el_chain;
98         struct echo_object     *el_object;
99         __u64                   el_cookie;
100         atomic_t                el_refcount;
101 };
102
103 #ifdef HAVE_SERVER_SUPPORT
104 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
105
106 /**
107  * In order to use the values of members in struct mdd_device,
108  * we define an alias structure here.
109  */
110 struct echo_md_device {
111         struct md_device                 emd_md_dev;
112         struct obd_export               *emd_child_exp;
113         struct dt_device                *emd_child;
114         struct dt_device                *emd_bottom;
115         struct lu_fid                    emd_root_fid;
116         struct lu_fid                    emd_local_root_fid;
117 };
118 #endif /* HAVE_SERVER_SUPPORT */
119
120 static int echo_client_setup(const struct lu_env *env,
121                              struct obd_device *obddev,
122                              struct lustre_cfg *lcfg);
123 static int echo_client_cleanup(struct obd_device *obddev);
124
125
126 /** \defgroup echo_helpers Helper functions
127  * @{
128  */
129 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
130 {
131         return container_of0(dev, struct echo_device, ed_cl);
132 }
133
134 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
135 {
136         return &d->ed_cl;
137 }
138
139 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
140 {
141         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
142 }
143
144 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
145 {
146         return &eco->eo_cl;
147 }
148
149 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
150 {
151         return container_of(o, struct echo_object, eo_cl);
152 }
153
154 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
155 {
156         return container_of(s, struct echo_page, ep_cl);
157 }
158
159 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
160 {
161         return container_of(s, struct echo_lock, el_cl);
162 }
163
164 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
165 {
166         return ecl->el_cl.cls_lock;
167 }
168
169 static struct lu_context_key echo_thread_key;
170 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
171 {
172         struct echo_thread_info *info;
173         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
174         LASSERT(info != NULL);
175         return info;
176 }
177
178 static inline
179 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
180 {
181         return container_of(c, struct echo_object_conf, eoc_cl);
182 }
183
184 #ifdef HAVE_SERVER_SUPPORT
185 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
186 {
187         return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
188 }
189
190 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
191 {
192         return &d->emd_md_dev.md_lu_dev;
193 }
194
195 static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
196 {
197         return emd2lu_dev(d)->ld_site->ld_seq_site;
198 }
199
200 static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
201 {
202         return d->emd_md_dev.md_lu_dev.ld_obd;
203 }
204 #endif /* HAVE_SERVER_SUPPORT */
205
206 /** @} echo_helpers */
207
208 static int cl_echo_object_put(struct echo_object *eco);
209 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
210                               struct page **pages, int npages, int async);
211
212 struct echo_thread_info {
213         struct echo_object_conf eti_conf;
214         struct lustre_md        eti_md;
215
216         struct cl_2queue        eti_queue;
217         struct cl_io            eti_io;
218         struct cl_lock          eti_lock;
219         struct lu_fid           eti_fid;
220         struct lu_fid           eti_fid2;
221 #ifdef HAVE_SERVER_SUPPORT
222         struct md_op_spec       eti_spec;
223         struct lov_mds_md_v3    eti_lmm;
224         struct lov_user_md_v3   eti_lum;
225         struct md_attr          eti_ma;
226         struct lu_name          eti_lname;
227         /* per-thread values, can be re-used */
228         void                    *eti_big_lmm;
229         int                     eti_big_lmmsize;
230         char                    eti_name[20];
231         struct lu_buf           eti_buf;
232         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
233 #endif
234 };
235
236 /* No session used right now */
237 struct echo_session_info {
238         unsigned long dummy;
239 };
240
241 static struct kmem_cache *echo_lock_kmem;
242 static struct kmem_cache *echo_object_kmem;
243 static struct kmem_cache *echo_thread_kmem;
244 static struct kmem_cache *echo_session_kmem;
245 /* static struct kmem_cache *echo_req_kmem; */
246
247 static struct lu_kmem_descr echo_caches[] = {
248         {
249                 .ckd_cache = &echo_lock_kmem,
250                 .ckd_name  = "echo_lock_kmem",
251                 .ckd_size  = sizeof (struct echo_lock)
252         },
253         {
254                 .ckd_cache = &echo_object_kmem,
255                 .ckd_name  = "echo_object_kmem",
256                 .ckd_size  = sizeof (struct echo_object)
257         },
258         {
259                 .ckd_cache = &echo_thread_kmem,
260                 .ckd_name  = "echo_thread_kmem",
261                 .ckd_size  = sizeof (struct echo_thread_info)
262         },
263         {
264                 .ckd_cache = &echo_session_kmem,
265                 .ckd_name  = "echo_session_kmem",
266                 .ckd_size  = sizeof (struct echo_session_info)
267         },
268         {
269                 .ckd_cache = NULL
270         }
271 };
272
273 /** \defgroup echo_page Page operations
274  *
275  * Echo page operations.
276  *
277  * @{
278  */
279 static int echo_page_own(const struct lu_env *env,
280                          const struct cl_page_slice *slice,
281                          struct cl_io *io, int nonblock)
282 {
283         struct echo_page *ep = cl2echo_page(slice);
284
285         if (!nonblock)
286                 mutex_lock(&ep->ep_lock);
287         else if (!mutex_trylock(&ep->ep_lock))
288                 return -EAGAIN;
289         return 0;
290 }
291
292 static void echo_page_disown(const struct lu_env *env,
293                              const struct cl_page_slice *slice,
294                              struct cl_io *io)
295 {
296         struct echo_page *ep = cl2echo_page(slice);
297
298         LASSERT(mutex_is_locked(&ep->ep_lock));
299         mutex_unlock(&ep->ep_lock);
300 }
301
302 static void echo_page_discard(const struct lu_env *env,
303                               const struct cl_page_slice *slice,
304                               struct cl_io *unused)
305 {
306         cl_page_delete(env, slice->cpl_page);
307 }
308
309 static int echo_page_is_vmlocked(const struct lu_env *env,
310                                  const struct cl_page_slice *slice)
311 {
312         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
313                 return -EBUSY;
314         return -ENODATA;
315 }
316
317 static void echo_page_completion(const struct lu_env *env,
318                                  const struct cl_page_slice *slice,
319                                  int ioret)
320 {
321         LASSERT(slice->cpl_page->cp_sync_io != NULL);
322 }
323
324 static void echo_page_fini(const struct lu_env *env,
325                            struct cl_page_slice *slice)
326 {
327         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
328         ENTRY;
329
330         atomic_dec(&eco->eo_npages);
331         page_cache_release(slice->cpl_page->cp_vmpage);
332         EXIT;
333 }
334
335 static int echo_page_prep(const struct lu_env *env,
336                           const struct cl_page_slice *slice,
337                           struct cl_io *unused)
338 {
339         return 0;
340 }
341
342 static int echo_page_print(const struct lu_env *env,
343                            const struct cl_page_slice *slice,
344                            void *cookie, lu_printer_t printer)
345 {
346         struct echo_page *ep = cl2echo_page(slice);
347
348         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
349                    ep, mutex_is_locked(&ep->ep_lock),
350                    slice->cpl_page->cp_vmpage);
351         return 0;
352 }
353
354 static const struct cl_page_operations echo_page_ops = {
355         .cpo_own           = echo_page_own,
356         .cpo_disown        = echo_page_disown,
357         .cpo_discard       = echo_page_discard,
358         .cpo_fini          = echo_page_fini,
359         .cpo_print         = echo_page_print,
360         .cpo_is_vmlocked   = echo_page_is_vmlocked,
361         .io = {
362                 [CRT_READ] = {
363                         .cpo_prep        = echo_page_prep,
364                         .cpo_completion  = echo_page_completion,
365                 },
366                 [CRT_WRITE] = {
367                         .cpo_prep        = echo_page_prep,
368                         .cpo_completion  = echo_page_completion,
369                 }
370         }
371 };
372 /** @} echo_page */
373
374 /** \defgroup echo_lock Locking
375  *
376  * echo lock operations
377  *
378  * @{
379  */
380 static void echo_lock_fini(const struct lu_env *env,
381                            struct cl_lock_slice *slice)
382 {
383         struct echo_lock *ecl = cl2echo_lock(slice);
384
385         LASSERT(list_empty(&ecl->el_chain));
386         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
387 }
388
389 static struct cl_lock_operations echo_lock_ops = {
390         .clo_fini      = echo_lock_fini,
391 };
392
393 /** @} echo_lock */
394
395 /** \defgroup echo_cl_ops cl_object operations
396  *
397  * operations for cl_object
398  *
399  * @{
400  */
401 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
402                           struct cl_page *page, pgoff_t index)
403 {
404         struct echo_page *ep = cl_object_page_slice(obj, page);
405         struct echo_object *eco = cl2echo_obj(obj);
406         ENTRY;
407
408         page_cache_get(page->cp_vmpage);
409         mutex_init(&ep->ep_lock);
410         cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
411         atomic_inc(&eco->eo_npages);
412         RETURN(0);
413 }
414
415 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
416                         struct cl_io *io)
417 {
418         return 0;
419 }
420
421 static int echo_lock_init(const struct lu_env *env,
422                           struct cl_object *obj, struct cl_lock *lock,
423                           const struct cl_io *unused)
424 {
425         struct echo_lock *el;
426         ENTRY;
427
428         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
429         if (el != NULL) {
430                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
431                 el->el_object = cl2echo_obj(obj);
432                 INIT_LIST_HEAD(&el->el_chain);
433                 atomic_set(&el->el_refcount, 0);
434         }
435         RETURN(el == NULL ? -ENOMEM : 0);
436 }
437
438 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
439                          const struct cl_object_conf *conf)
440 {
441         return 0;
442 }
443
444 static const struct cl_object_operations echo_cl_obj_ops = {
445         .coo_page_init = echo_page_init,
446         .coo_lock_init = echo_lock_init,
447         .coo_io_init   = echo_io_init,
448         .coo_conf_set  = echo_conf_set
449 };
450 /** @} echo_cl_ops */
451
452 /** \defgroup echo_lu_ops lu_object operations
453  *
454  * operations for echo lu object.
455  *
456  * @{
457  */
458 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
459                             const struct lu_object_conf *conf)
460 {
461         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
462         struct echo_client_obd *ec     = ed->ed_ec;
463         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
464         ENTRY;
465
466         if (ed->ed_next) {
467                 struct lu_object  *below;
468                 struct lu_device  *under;
469
470                 under = ed->ed_next;
471                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
472                                                         under);
473                 if (below == NULL)
474                         RETURN(-ENOMEM);
475                 lu_object_add(obj, below);
476         }
477
478         if (!ed->ed_next_ismd) {
479                 const struct cl_object_conf *cconf = lu2cl_conf(conf);
480                 struct echo_object_conf *econf = cl2echo_conf(cconf);
481
482                 LASSERT(econf->eoc_oinfo != NULL);
483
484                 /* Transfer the oinfo pointer to eco that it won't be
485                  * freed. */
486                 eco->eo_oinfo = *econf->eoc_oinfo;
487                 *econf->eoc_oinfo = NULL;
488         } else {
489                 eco->eo_oinfo = NULL;
490         }
491
492         eco->eo_dev = ed;
493         atomic_set(&eco->eo_npages, 0);
494         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
495
496         spin_lock(&ec->ec_lock);
497         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
498         spin_unlock(&ec->ec_lock);
499
500         RETURN(0);
501 }
502
503 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
504 {
505         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
506         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
507         ENTRY;
508
509         LASSERT(atomic_read(&eco->eo_npages) == 0);
510
511         spin_lock(&ec->ec_lock);
512         list_del_init(&eco->eo_obj_chain);
513         spin_unlock(&ec->ec_lock);
514
515         lu_object_fini(obj);
516         lu_object_header_fini(obj->lo_header);
517
518         if (eco->eo_oinfo != NULL)
519                 OBD_FREE_PTR(eco->eo_oinfo);
520
521         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
522         EXIT;
523 }
524
525 static int echo_object_print(const struct lu_env *env, void *cookie,
526                             lu_printer_t p, const struct lu_object *o)
527 {
528         struct echo_object *obj = cl2echo_obj(lu2cl(o));
529
530         return (*p)(env, cookie, "echoclient-object@%p", obj);
531 }
532
533 static const struct lu_object_operations echo_lu_obj_ops = {
534         .loo_object_init      = echo_object_init,
535         .loo_object_delete    = NULL,
536         .loo_object_release   = NULL,
537         .loo_object_free      = echo_object_free,
538         .loo_object_print     = echo_object_print,
539         .loo_object_invariant = NULL
540 };
541 /** @} echo_lu_ops */
542
543 /** \defgroup echo_lu_dev_ops  lu_device operations
544  *
545  * Operations for echo lu device.
546  *
547  * @{
548  */
549 static struct lu_object *echo_object_alloc(const struct lu_env *env,
550                                            const struct lu_object_header *hdr,
551                                            struct lu_device *dev)
552 {
553         struct echo_object *eco;
554         struct lu_object *obj = NULL;
555         ENTRY;
556
557         /* we're the top dev. */
558         LASSERT(hdr == NULL);
559         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
560         if (eco != NULL) {
561                 struct cl_object_header *hdr = &eco->eo_hdr;
562
563                 obj = &echo_obj2cl(eco)->co_lu;
564                 cl_object_header_init(hdr);
565                 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
566
567                 lu_object_init(obj, &hdr->coh_lu, dev);
568                 lu_object_add_top(&hdr->coh_lu, obj);
569
570                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
571                 obj->lo_ops       = &echo_lu_obj_ops;
572         }
573         RETURN(obj);
574 }
575
576 static struct lu_device_operations echo_device_lu_ops = {
577         .ldo_object_alloc   = echo_object_alloc,
578 };
579
580 /** @} echo_lu_dev_ops */
581
582 static struct cl_device_operations echo_device_cl_ops = {
583 };
584
585 /** \defgroup echo_init Setup and teardown
586  *
587  * Init and fini functions for echo client.
588  *
589  * @{
590  */
591 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
592 {
593         struct cl_site *site = &ed->ed_site_myself;
594         int rc;
595
596         /* initialize site */
597         rc = cl_site_init(site, &ed->ed_cl);
598         if (rc) {
599                 CERROR("Cannot initilize site for echo client(%d)\n", rc);
600                 return rc;
601         }
602
603         rc = lu_site_init_finish(&site->cs_lu);
604         if (rc)
605                 return rc;
606
607         ed->ed_site = site;
608         return 0;
609 }
610
611 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
612 {
613         if (ed->ed_site) {
614                 if (!ed->ed_next_ismd)
615                         cl_site_fini(ed->ed_site);
616                 ed->ed_site = NULL;
617         }
618 }
619
620 static void *echo_thread_key_init(const struct lu_context *ctx,
621                                   struct lu_context_key *key)
622 {
623         struct echo_thread_info *info;
624
625         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
626         if (info == NULL)
627                 info = ERR_PTR(-ENOMEM);
628         return info;
629 }
630
631 static void echo_thread_key_fini(const struct lu_context *ctx,
632                          struct lu_context_key *key, void *data)
633 {
634         struct echo_thread_info *info = data;
635         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
636 }
637
638 static void echo_thread_key_exit(const struct lu_context *ctx,
639                          struct lu_context_key *key, void *data)
640 {
641 }
642
643 static struct lu_context_key echo_thread_key = {
644         .lct_tags = LCT_CL_THREAD,
645         .lct_init = echo_thread_key_init,
646         .lct_fini = echo_thread_key_fini,
647         .lct_exit = echo_thread_key_exit
648 };
649
650 static void *echo_session_key_init(const struct lu_context *ctx,
651                                   struct lu_context_key *key)
652 {
653         struct echo_session_info *session;
654
655         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
656         if (session == NULL)
657                 session = ERR_PTR(-ENOMEM);
658         return session;
659 }
660
661 static void echo_session_key_fini(const struct lu_context *ctx,
662                                  struct lu_context_key *key, void *data)
663 {
664         struct echo_session_info *session = data;
665         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
666 }
667
668 static void echo_session_key_exit(const struct lu_context *ctx,
669                                  struct lu_context_key *key, void *data)
670 {
671 }
672
673 static struct lu_context_key echo_session_key = {
674         .lct_tags = LCT_SESSION,
675         .lct_init = echo_session_key_init,
676         .lct_fini = echo_session_key_fini,
677         .lct_exit = echo_session_key_exit
678 };
679
680 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
681
682 #ifdef HAVE_SERVER_SUPPORT
683 # define ECHO_SEQ_WIDTH 0xffffffff
684 static int echo_fid_init(struct echo_device *ed, char *obd_name,
685                          struct seq_server_site *ss)
686 {
687         char *prefix;
688         int rc;
689         ENTRY;
690
691         OBD_ALLOC_PTR(ed->ed_cl_seq);
692         if (ed->ed_cl_seq == NULL)
693                 RETURN(-ENOMEM);
694
695         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
696         if (prefix == NULL)
697                 GOTO(out_free_seq, rc = -ENOMEM);
698
699         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
700
701         /* Init client side sequence-manager */
702         rc = seq_client_init(ed->ed_cl_seq, NULL,
703                              LUSTRE_SEQ_METADATA,
704                              prefix, ss->ss_server_seq);
705         ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
706         OBD_FREE(prefix, MAX_OBD_NAME + 5);
707         if (rc)
708                 GOTO(out_free_seq, rc);
709
710         RETURN(0);
711
712 out_free_seq:
713         OBD_FREE_PTR(ed->ed_cl_seq);
714         ed->ed_cl_seq = NULL;
715         RETURN(rc);
716 }
717
718 static int echo_fid_fini(struct obd_device *obddev)
719 {
720         struct echo_device *ed = obd2echo_dev(obddev);
721         ENTRY;
722
723         if (ed->ed_cl_seq != NULL) {
724                 seq_client_fini(ed->ed_cl_seq);
725                 OBD_FREE_PTR(ed->ed_cl_seq);
726                 ed->ed_cl_seq = NULL;
727         }
728
729         RETURN(0);
730 }
731
732 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
733 {
734         ENTRY;
735
736         if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
737                 local_oid_storage_fini(env, ed->ed_los);
738                 ed->ed_los = NULL;
739         }
740 }
741
742 static int
743 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
744                           struct local_oid_storage *los,
745                           const struct lu_fid *pfid, const char *name,
746                           __u32 mode, struct lu_fid *fid)
747 {
748         struct dt_object        *parent = NULL;
749         struct dt_object        *dto = NULL;
750         int                      rc = 0;
751         ENTRY;
752
753         LASSERT(!fid_is_zero(pfid));
754         parent = dt_locate(env, emd->emd_bottom, pfid);
755         if (unlikely(IS_ERR(parent)))
756                 RETURN(PTR_ERR(parent));
757
758         /* create local file with @fid */
759         dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
760                                                  parent, name, mode);
761         if (IS_ERR(dto))
762                 GOTO(out_put, rc = PTR_ERR(dto));
763
764         *fid = *lu_object_fid(&dto->do_lu);
765         /* since stack is not fully set up the local_storage uses own stack
766          * and we should drop its object from cache */
767         lu_object_put_nocache(env, &dto->do_lu);
768
769         EXIT;
770 out_put:
771         lu_object_put(env, &parent->do_lu);
772         RETURN(rc);
773 }
774
775 static int
776 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
777                  struct echo_device *ed)
778 {
779         struct lu_fid                    fid;
780         int                              rc = 0;
781         ENTRY;
782
783         /* Setup local dirs */
784         fid.f_seq = FID_SEQ_LOCAL_NAME;
785         fid.f_oid = 1;
786         fid.f_ver = 0;
787         rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
788         if (rc != 0)
789                 RETURN(rc);
790
791         lu_echo_root_fid(&fid);
792         if (echo_md_seq_site(emd)->ss_node_id == 0) {
793                 rc = echo_md_local_file_create(env, emd, ed->ed_los,
794                                                &emd->emd_local_root_fid,
795                                                echo_md_root_dir_name, S_IFDIR |
796                                                S_IRUGO | S_IWUSR | S_IXUGO,
797                                                &fid);
798                 if (rc != 0) {
799                         CERROR("%s: create md echo root fid failed: rc = %d\n",
800                                emd2obd_dev(emd)->obd_name, rc);
801                         GOTO(out_los, rc);
802                 }
803         }
804         ed->ed_root_fid = fid;
805
806         RETURN(0);
807 out_los:
808         echo_ed_los_fini(env, ed);
809
810         RETURN(rc);
811 }
812 #endif /* HAVE_SERVER_SUPPORT */
813
814 static struct lu_device *echo_device_alloc(const struct lu_env *env,
815                                            struct lu_device_type *t,
816                                            struct lustre_cfg *cfg)
817 {
818         struct lu_device   *next;
819         struct echo_device *ed;
820         struct cl_device   *cd;
821         struct obd_device  *obd = NULL; /* to keep compiler happy */
822         struct obd_device  *tgt;
823         const char *tgt_type_name;
824         int rc;
825         int cleanup = 0;
826         ENTRY;
827
828         OBD_ALLOC_PTR(ed);
829         if (ed == NULL)
830                 GOTO(out, rc = -ENOMEM);
831
832         cleanup = 1;
833         cd = &ed->ed_cl;
834         rc = cl_device_init(cd, t);
835         if (rc)
836                 GOTO(out, rc);
837
838         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
839         cd->cd_ops = &echo_device_cl_ops;
840
841         cleanup = 2;
842         obd = class_name2obd(lustre_cfg_string(cfg, 0));
843         LASSERT(obd != NULL);
844         LASSERT(env != NULL);
845
846         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
847         if (tgt == NULL) {
848                 CERROR("Can not find tgt device %s\n",
849                         lustre_cfg_string(cfg, 1));
850                 GOTO(out, rc = -ENODEV);
851         }
852
853         next = tgt->obd_lu_dev;
854
855         if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
856                 ed->ed_next_ismd = 1;
857         } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
858                    strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
859                 ed->ed_next_ismd = 0;
860                 rc = echo_site_init(env, ed);
861                 if (rc)
862                         GOTO(out, rc);
863         } else {
864                 GOTO(out, rc = -EINVAL);
865         }
866
867         cleanup = 3;
868
869         rc = echo_client_setup(env, obd, cfg);
870         if (rc)
871                 GOTO(out, rc);
872
873         ed->ed_ec = &obd->u.echo_client;
874         cleanup = 4;
875
876         if (ed->ed_next_ismd) {
877 #ifdef HAVE_SERVER_SUPPORT
878                 /* Suppose to connect to some Metadata layer */
879                 struct lu_site          *ls = NULL;
880                 struct lu_device        *ld = NULL;
881                 struct md_device        *md = NULL;
882                 struct echo_md_device   *emd = NULL;
883                 int                      found = 0;
884
885                 if (next == NULL) {
886                         CERROR("%s is not lu device type!\n",
887                                lustre_cfg_string(cfg, 1));
888                         GOTO(out, rc = -EINVAL);
889                 }
890
891                 tgt_type_name = lustre_cfg_string(cfg, 2);
892                 if (!tgt_type_name) {
893                         CERROR("%s no type name for echo %s setup\n",
894                                 lustre_cfg_string(cfg, 1),
895                                 tgt->obd_type->typ_name);
896                         GOTO(out, rc = -EINVAL);
897                 }
898
899                 ls = next->ld_site;
900
901                 spin_lock(&ls->ls_ld_lock);
902                 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
903                         if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
904                                 found = 1;
905                                 break;
906                         }
907                 }
908                 spin_unlock(&ls->ls_ld_lock);
909
910                 if (found == 0) {
911                         CERROR("%s is not lu device type!\n",
912                                lustre_cfg_string(cfg, 1));
913                         GOTO(out, rc = -EINVAL);
914                 }
915
916                 next = ld;
917                 /* For MD echo client, it will use the site in MDS stack */
918                 ed->ed_site_myself.cs_lu = *ls;
919                 ed->ed_site = &ed->ed_site_myself;
920                 ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu;
921                 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
922                 if (rc) {
923                         CERROR("echo fid init error %d\n", rc);
924                         GOTO(out, rc);
925                 }
926
927                 md = lu2md_dev(next);
928                 emd = lu2emd_dev(&md->md_lu_dev);
929                 rc = echo_md_root_get(env, emd, ed);
930                 if (rc != 0) {
931                         CERROR("%s: get root error: rc = %d\n",
932                                 emd2obd_dev(emd)->obd_name, rc);
933                         GOTO(out, rc);
934                 }
935 #else /* !HAVE_SERVER_SUPPORT */
936                 CERROR("Local operations are NOT supported on client side. "
937                        "Only remote operations are supported. Metadata client "
938                        "must be run on server side.\n");
939                 GOTO(out, rc = -EOPNOTSUPP);
940 #endif /* HAVE_SERVER_SUPPORT */
941         } else {
942                  /* if echo client is to be stacked upon ost device, the next is
943                   * NULL since ost is not a clio device so far */
944                 if (next != NULL && !lu_device_is_cl(next))
945                         next = NULL;
946
947                 tgt_type_name = tgt->obd_type->typ_name;
948                 if (next != NULL) {
949                         LASSERT(next != NULL);
950                         if (next->ld_site != NULL)
951                                 GOTO(out, rc = -EBUSY);
952
953                         next->ld_site = &ed->ed_site->cs_lu;
954                         rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
955                                                      next->ld_type->ldt_name,
956                                                      NULL);
957                         if (rc)
958                                 GOTO(out, rc);
959                 } else
960                         LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
961         }
962
963         ed->ed_next = next;
964         RETURN(&cd->cd_lu_dev);
965 out:
966         switch(cleanup) {
967         case 4: {
968                 int rc2;
969                 rc2 = echo_client_cleanup(obd);
970                 if (rc2)
971                         CERROR("Cleanup obd device %s error(%d)\n",
972                                obd->obd_name, rc2);
973         }
974
975         case 3:
976                 echo_site_fini(env, ed);
977         case 2:
978                 cl_device_fini(&ed->ed_cl);
979         case 1:
980                 OBD_FREE_PTR(ed);
981         case 0:
982         default:
983                 break;
984         }
985         return(ERR_PTR(rc));
986 }
987
988 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
989                           const char *name, struct lu_device *next)
990 {
991         LBUG();
992         return 0;
993 }
994
995 static struct lu_device *echo_device_fini(const struct lu_env *env,
996                                           struct lu_device *d)
997 {
998         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
999         struct lu_device *next = ed->ed_next;
1000
1001         while (next && !ed->ed_next_ismd)
1002                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
1003         return NULL;
1004 }
1005
1006 static void echo_lock_release(const struct lu_env *env,
1007                               struct echo_lock *ecl,
1008                               int still_used)
1009 {
1010         struct cl_lock *clk = echo_lock2cl(ecl);
1011
1012         cl_lock_release(env, clk);
1013 }
1014
1015 static struct lu_device *echo_device_free(const struct lu_env *env,
1016                                           struct lu_device *d)
1017 {
1018         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
1019         struct echo_client_obd *ec   = ed->ed_ec;
1020         struct echo_object     *eco;
1021         struct lu_device       *next = ed->ed_next;
1022
1023         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
1024                ed, next);
1025
1026         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1027
1028         /* check if there are objects still alive.
1029          * It shouldn't have any object because lu_site_purge would cleanup
1030          * all of cached objects. Anyway, probably the echo device is being
1031          * parallelly accessed.
1032          */
1033         spin_lock(&ec->ec_lock);
1034         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
1035                 eco->eo_deleted = 1;
1036         spin_unlock(&ec->ec_lock);
1037
1038         /* purge again */
1039         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1040
1041         CDEBUG(D_INFO,
1042                "Waiting for the reference of echo object to be dropped\n");
1043
1044         /* Wait for the last reference to be dropped. */
1045         spin_lock(&ec->ec_lock);
1046         while (!list_empty(&ec->ec_objects)) {
1047                 spin_unlock(&ec->ec_lock);
1048                 CERROR("echo_client still has objects at cleanup time, "
1049                        "wait for 1 second\n");
1050                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1051                                                    cfs_time_seconds(1));
1052                 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1053                 spin_lock(&ec->ec_lock);
1054         }
1055         spin_unlock(&ec->ec_lock);
1056
1057         LASSERT(list_empty(&ec->ec_locks));
1058
1059         CDEBUG(D_INFO, "No object exists, exiting...\n");
1060
1061         echo_client_cleanup(d->ld_obd);
1062 #ifdef HAVE_SERVER_SUPPORT
1063         echo_fid_fini(d->ld_obd);
1064         echo_ed_los_fini(env, ed);
1065 #endif
1066         while (next && !ed->ed_next_ismd)
1067                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1068
1069         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
1070         echo_site_fini(env, ed);
1071         cl_device_fini(&ed->ed_cl);
1072         OBD_FREE_PTR(ed);
1073
1074         return NULL;
1075 }
1076
1077 static const struct lu_device_type_operations echo_device_type_ops = {
1078         .ldto_init = echo_type_init,
1079         .ldto_fini = echo_type_fini,
1080
1081         .ldto_start = echo_type_start,
1082         .ldto_stop  = echo_type_stop,
1083
1084         .ldto_device_alloc = echo_device_alloc,
1085         .ldto_device_free  = echo_device_free,
1086         .ldto_device_init  = echo_device_init,
1087         .ldto_device_fini  = echo_device_fini
1088 };
1089
1090 static struct lu_device_type echo_device_type = {
1091         .ldt_tags     = LU_DEVICE_CL,
1092         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
1093         .ldt_ops      = &echo_device_type_ops,
1094         .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1095 };
1096 /** @} echo_init */
1097
1098 /** \defgroup echo_exports Exported operations
1099  *
1100  * exporting functions to echo client
1101  *
1102  * @{
1103  */
1104
1105 /* Interfaces to echo client obd device */
1106 static struct echo_object *
1107 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
1108 {
1109         struct lu_env *env;
1110         struct echo_thread_info *info;
1111         struct echo_object_conf *conf;
1112         struct echo_object *eco;
1113         struct cl_object *obj;
1114         struct lov_oinfo *oinfo = NULL;
1115         struct lu_fid *fid;
1116         int refcheck;
1117         int rc;
1118         ENTRY;
1119
1120         LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
1121         LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
1122
1123         /* Never return an object if the obd is to be freed. */
1124         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1125                 RETURN(ERR_PTR(-ENODEV));
1126
1127         env = cl_env_get(&refcheck);
1128         if (IS_ERR(env))
1129                 RETURN((void *)env);
1130
1131         info = echo_env_info(env);
1132         conf = &info->eti_conf;
1133         if (d->ed_next) {
1134                 OBD_ALLOC_PTR(oinfo);
1135                 if (oinfo == NULL)
1136                         GOTO(out, eco = ERR_PTR(-ENOMEM));
1137
1138                 oinfo->loi_oi = *oi;
1139                 conf->eoc_cl.u.coc_oinfo = oinfo;
1140         }
1141
1142         /* If echo_object_init() is successful then ownership of oinfo
1143          * is transferred to the object. */
1144         conf->eoc_oinfo = &oinfo;
1145
1146         fid = &info->eti_fid;
1147         rc = ostid_to_fid(fid, oi, 0);
1148         if (rc != 0)
1149                 GOTO(out, eco = ERR_PTR(rc));
1150
1151         /* In the function below, .hs_keycmp resolves to
1152          * lu_obj_hop_keycmp() */
1153         /* coverity[overrun-buffer-val] */
1154         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1155         if (IS_ERR(obj))
1156                 GOTO(out, eco = (void*)obj);
1157
1158         eco = cl2echo_obj(obj);
1159         if (eco->eo_deleted) {
1160                 cl_object_put(env, obj);
1161                 eco = ERR_PTR(-EAGAIN);
1162         }
1163
1164 out:
1165         if (oinfo != NULL)
1166                 OBD_FREE_PTR(oinfo);
1167
1168         cl_env_put(env, &refcheck);
1169         RETURN(eco);
1170 }
1171
1172 static int cl_echo_object_put(struct echo_object *eco)
1173 {
1174         struct lu_env *env;
1175         struct cl_object *obj = echo_obj2cl(eco);
1176         int refcheck;
1177         ENTRY;
1178
1179         env = cl_env_get(&refcheck);
1180         if (IS_ERR(env))
1181                 RETURN(PTR_ERR(env));
1182
1183         /* an external function to kill an object? */
1184         if (eco->eo_deleted) {
1185                 struct lu_object_header *loh = obj->co_lu.lo_header;
1186                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1187                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1188         }
1189
1190         cl_object_put(env, obj);
1191         cl_env_put(env, &refcheck);
1192         RETURN(0);
1193 }
1194
1195 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1196                             u64 start, u64 end, int mode,
1197                             __u64 *cookie , __u32 enqflags)
1198 {
1199         struct cl_io *io;
1200         struct cl_lock *lck;
1201         struct cl_object *obj;
1202         struct cl_lock_descr *descr;
1203         struct echo_thread_info *info;
1204         int rc = -ENOMEM;
1205         ENTRY;
1206
1207         info = echo_env_info(env);
1208         io = &info->eti_io;
1209         lck = &info->eti_lock;
1210         obj = echo_obj2cl(eco);
1211
1212         memset(lck, 0, sizeof(*lck));
1213         descr = &lck->cll_descr;
1214         descr->cld_obj   = obj;
1215         descr->cld_start = cl_index(obj, start);
1216         descr->cld_end   = cl_index(obj, end);
1217         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1218         descr->cld_enq_flags = enqflags;
1219         io->ci_obj = obj;
1220
1221         rc = cl_lock_request(env, io, lck);
1222         if (rc == 0) {
1223                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1224                 struct echo_lock *el;
1225
1226                 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1227                 spin_lock(&ec->ec_lock);
1228                 if (list_empty(&el->el_chain)) {
1229                         list_add(&el->el_chain, &ec->ec_locks);
1230                         el->el_cookie = ++ec->ec_unique;
1231                 }
1232                 atomic_inc(&el->el_refcount);
1233                 *cookie = el->el_cookie;
1234                 spin_unlock(&ec->ec_lock);
1235         }
1236         RETURN(rc);
1237 }
1238
1239 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1240                            __u64 cookie)
1241 {
1242         struct echo_client_obd *ec = ed->ed_ec;
1243         struct echo_lock       *ecl = NULL;
1244         struct list_head        *el;
1245         int found = 0, still_used = 0;
1246         ENTRY;
1247
1248         LASSERT(ec != NULL);
1249         spin_lock(&ec->ec_lock);
1250         list_for_each(el, &ec->ec_locks) {
1251                 ecl = list_entry(el, struct echo_lock, el_chain);
1252                 CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
1253                 found = (ecl->el_cookie == cookie);
1254                 if (found) {
1255                         if (atomic_dec_and_test(&ecl->el_refcount))
1256                                 list_del_init(&ecl->el_chain);
1257                         else
1258                                 still_used = 1;
1259                         break;
1260                 }
1261         }
1262         spin_unlock(&ec->ec_lock);
1263
1264         if (!found)
1265                 RETURN(-ENOENT);
1266
1267         echo_lock_release(env, ecl, still_used);
1268         RETURN(0);
1269 }
1270
1271 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1272                                 struct cl_page *page)
1273 {
1274         struct echo_thread_info *info;
1275         struct cl_2queue        *queue;
1276
1277         info = echo_env_info(env);
1278         LASSERT(io == &info->eti_io);
1279
1280         queue = &info->eti_queue;
1281         cl_page_list_add(&queue->c2_qout, page);
1282 }
1283
1284 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1285                               struct page **pages, int npages, int async)
1286 {
1287         struct lu_env           *env;
1288         struct echo_thread_info *info;
1289         struct cl_object        *obj = echo_obj2cl(eco);
1290         struct echo_device      *ed  = eco->eo_dev;
1291         struct cl_2queue        *queue;
1292         struct cl_io            *io;
1293         struct cl_page          *clp;
1294         struct lustre_handle    lh = { 0 };
1295         int page_size = cl_page_size(obj);
1296         int refcheck;
1297         int rc;
1298         int i;
1299         ENTRY;
1300
1301         LASSERT((offset & ~PAGE_MASK) == 0);
1302         LASSERT(ed->ed_next != NULL);
1303         env = cl_env_get(&refcheck);
1304         if (IS_ERR(env))
1305                 RETURN(PTR_ERR(env));
1306
1307         info    = echo_env_info(env);
1308         io      = &info->eti_io;
1309         queue   = &info->eti_queue;
1310
1311         cl_2queue_init(queue);
1312
1313         io->ci_ignore_layout = 1;
1314         rc = cl_io_init(env, io, CIT_MISC, obj);
1315         if (rc < 0)
1316                 GOTO(out, rc);
1317         LASSERT(rc == 0);
1318
1319
1320         rc = cl_echo_enqueue0(env, eco, offset,
1321                               offset + npages * PAGE_CACHE_SIZE - 1,
1322                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1323                               CEF_NEVER);
1324         if (rc < 0)
1325                 GOTO(error_lock, rc);
1326
1327         for (i = 0; i < npages; i++) {
1328                 LASSERT(pages[i]);
1329                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1330                                    pages[i], CPT_TRANSIENT);
1331                 if (IS_ERR(clp)) {
1332                         rc = PTR_ERR(clp);
1333                         break;
1334                 }
1335                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1336
1337                 rc = cl_page_own(env, io, clp);
1338                 if (rc) {
1339                         LASSERT(clp->cp_state == CPS_FREEING);
1340                         cl_page_put(env, clp);
1341                         break;
1342                 }
1343
1344                 cl_2queue_add(queue, clp);
1345
1346                 /* drop the reference count for cl_page_find, so that the page
1347                  * will be freed in cl_2queue_fini. */
1348                 cl_page_put(env, clp);
1349                 cl_page_clip(env, clp, 0, page_size);
1350
1351                 offset += page_size;
1352         }
1353
1354         if (rc == 0) {
1355                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1356
1357                 async = async && (typ == CRT_WRITE);
1358                 if (async)
1359                         rc = cl_io_commit_async(env, io, &queue->c2_qin,
1360                                                 0, PAGE_SIZE,
1361                                                 echo_commit_callback);
1362                 else
1363                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1364                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1365                        async ? "async" : "sync", rc);
1366         }
1367
1368         cl_echo_cancel0(env, ed, lh.cookie);
1369         EXIT;
1370 error_lock:
1371         cl_2queue_discard(env, io, queue);
1372         cl_2queue_disown(env, io, queue);
1373         cl_2queue_fini(env, queue);
1374         cl_io_fini(env, io);
1375 out:
1376         cl_env_put(env, &refcheck);
1377         return rc;
1378 }
1379 /** @} echo_exports */
1380
1381
1382 static u64 last_object_id;
1383
1384 #ifdef HAVE_SERVER_SUPPORT
1385 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1386                                       __u64 id)
1387 {
1388         sprintf(name, LPU64, id);
1389         lname->ln_name = name;
1390         lname->ln_namelen = strlen(name);
1391 }
1392
1393 /* similar to mdt_attr_get_complex */
1394 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1395                             struct md_attr *ma)
1396 {
1397         struct echo_thread_info *info = echo_env_info(env);
1398         int                      rc;
1399
1400         ENTRY;
1401
1402         LASSERT(ma->ma_lmm_size > 0);
1403
1404         rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1405         if (rc < 0)
1406                 RETURN(rc);
1407
1408         /* big_lmm may need to be grown */
1409         if (info->eti_big_lmmsize < rc) {
1410                 int size = size_roundup_power2(rc);
1411
1412                 if (info->eti_big_lmmsize > 0) {
1413                         /* free old buffer */
1414                         LASSERT(info->eti_big_lmm);
1415                         OBD_FREE_LARGE(info->eti_big_lmm,
1416                                        info->eti_big_lmmsize);
1417                         info->eti_big_lmm = NULL;
1418                         info->eti_big_lmmsize = 0;
1419                 }
1420
1421                 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1422                 if (info->eti_big_lmm == NULL)
1423                         RETURN(-ENOMEM);
1424                 info->eti_big_lmmsize = size;
1425         }
1426         LASSERT(info->eti_big_lmmsize >= rc);
1427
1428         info->eti_buf.lb_buf = info->eti_big_lmm;
1429         info->eti_buf.lb_len = info->eti_big_lmmsize;
1430         rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1431         if (rc < 0)
1432                 RETURN(rc);
1433
1434         ma->ma_valid |= MA_LOV;
1435         ma->ma_lmm = info->eti_big_lmm;
1436         ma->ma_lmm_size = rc;
1437
1438         RETURN(0);
1439 }
1440
1441 static int echo_attr_get_complex(const struct lu_env *env,
1442                                  struct md_object *next,
1443                                  struct md_attr *ma)
1444 {
1445         struct echo_thread_info *info = echo_env_info(env);
1446         struct lu_buf           *buf = &info->eti_buf;
1447         umode_t          mode = lu_object_attr(&next->mo_lu);
1448         int                      need = ma->ma_need;
1449         int                      rc = 0, rc2;
1450
1451         ENTRY;
1452
1453         ma->ma_valid = 0;
1454
1455         if (need & MA_INODE) {
1456                 ma->ma_need = MA_INODE;
1457                 rc = mo_attr_get(env, next, ma);
1458                 if (rc)
1459                         GOTO(out, rc);
1460                 ma->ma_valid |= MA_INODE;
1461         }
1462
1463         if (need & MA_LOV) {
1464                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1465                         LASSERT(ma->ma_lmm_size > 0);
1466                         buf->lb_buf = ma->ma_lmm;
1467                         buf->lb_len = ma->ma_lmm_size;
1468                         rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1469                         if (rc2 > 0) {
1470                                 ma->ma_lmm_size = rc2;
1471                                 ma->ma_valid |= MA_LOV;
1472                         } else if (rc2 == -ENODATA) {
1473                                 /* no LOV EA */
1474                                 ma->ma_lmm_size = 0;
1475                         } else if (rc2 == -ERANGE) {
1476                                 rc2 = echo_big_lmm_get(env, next, ma);
1477                                 if (rc2 < 0)
1478                                         GOTO(out, rc = rc2);
1479                         } else {
1480                                 GOTO(out, rc = rc2);
1481                         }
1482                 }
1483         }
1484
1485 #ifdef CONFIG_FS_POSIX_ACL
1486         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1487                 buf->lb_buf = ma->ma_acl;
1488                 buf->lb_len = ma->ma_acl_size;
1489                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1490                 if (rc2 > 0) {
1491                         ma->ma_acl_size = rc2;
1492                         ma->ma_valid |= MA_ACL_DEF;
1493                 } else if (rc2 == -ENODATA) {
1494                         /* no ACLs */
1495                         ma->ma_acl_size = 0;
1496                 } else {
1497                         GOTO(out, rc = rc2);
1498                 }
1499         }
1500 #endif
1501 out:
1502         ma->ma_need = need;
1503         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
1504                rc, ma->ma_valid, ma->ma_lmm);
1505         RETURN(rc);
1506 }
1507
1508 static int
1509 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1510                         struct md_object *parent, struct lu_fid *fid,
1511                         struct lu_name *lname, struct md_op_spec *spec,
1512                         struct md_attr *ma)
1513 {
1514         struct lu_object        *ec_child, *child;
1515         struct lu_device        *ld = ed->ed_next;
1516         struct echo_thread_info *info = echo_env_info(env);
1517         struct lu_fid           *fid2 = &info->eti_fid2;
1518         struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
1519         int                      rc;
1520
1521         ENTRY;
1522
1523         rc = mdo_lookup(env, parent, lname, fid2, spec);
1524         if (rc == 0)
1525                 return -EEXIST;
1526         else if (rc != -ENOENT)
1527                 return rc;
1528
1529         ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1530                                      fid, &conf);
1531         if (IS_ERR(ec_child)) {
1532                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1533                         PTR_ERR(ec_child));
1534                 RETURN(PTR_ERR(ec_child));
1535         }
1536
1537         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1538         if (child == NULL) {
1539                 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1540                 GOTO(out_put, rc = -EINVAL);
1541         }
1542
1543         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1544                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1545
1546         /*
1547          * Do not perform lookup sanity check. We know that name does not exist.
1548          */
1549         spec->sp_cr_lookup = 0;
1550         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1551         if (rc) {
1552                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1553                 GOTO(out_put, rc);
1554         }
1555         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
1556                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1557         EXIT;
1558 out_put:
1559         lu_object_put(env, ec_child);
1560         return rc;
1561 }
1562
1563 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1564                              struct md_attr *ma)
1565 {
1566         struct echo_thread_info *info = echo_env_info(env);
1567
1568         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1569                 ma->ma_lmm = (void *)&info->eti_lmm;
1570                 ma->ma_lmm_size = sizeof(info->eti_lmm);
1571         } else {
1572                 LASSERT(info->eti_big_lmmsize);
1573                 ma->ma_lmm = info->eti_big_lmm;
1574                 ma->ma_lmm_size = info->eti_big_lmmsize;
1575         }
1576
1577         return 0;
1578 }
1579
1580 static int echo_create_md_object(const struct lu_env *env,
1581                                  struct echo_device *ed,
1582                                  struct lu_object *ec_parent,
1583                                  struct lu_fid *fid,
1584                                  char *name, int namelen,
1585                                  __u64 id, __u32 mode, int count,
1586                                  int stripe_count, int stripe_offset)
1587 {
1588         struct lu_object        *parent;
1589         struct echo_thread_info *info = echo_env_info(env);
1590         struct lu_name          *lname = &info->eti_lname;
1591         struct md_op_spec       *spec = &info->eti_spec;
1592         struct md_attr          *ma = &info->eti_ma;
1593         struct lu_device        *ld = ed->ed_next;
1594         int                      rc = 0;
1595         int                      i;
1596
1597         ENTRY;
1598
1599         if (ec_parent == NULL)
1600                 return -1;
1601         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1602         if (parent == NULL)
1603                 RETURN(-ENXIO);
1604
1605         memset(ma, 0, sizeof(*ma));
1606         memset(spec, 0, sizeof(*spec));
1607         if (stripe_count != 0) {
1608                 spec->sp_cr_flags |= FMODE_WRITE;
1609                 echo_set_lmm_size(env, ld, ma);
1610                 if (stripe_count != -1) {
1611                         struct lov_user_md_v3 *lum = &info->eti_lum;
1612
1613                         lum->lmm_magic = LOV_USER_MAGIC_V3;
1614                         lum->lmm_stripe_count = stripe_count;
1615                         lum->lmm_stripe_offset = stripe_offset;
1616                         lum->lmm_pattern = 0;
1617                         spec->u.sp_ea.eadata = lum;
1618                         spec->u.sp_ea.eadatalen = sizeof(*lum);
1619                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1620                 }
1621         }
1622
1623         ma->ma_attr.la_mode = mode;
1624         ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1625         ma->ma_attr.la_ctime = cfs_time_current_64();
1626
1627         if (name != NULL) {
1628                 lname->ln_name = name;
1629                 lname->ln_namelen = namelen;
1630                 /* If name is specified, only create one object by name */
1631                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1632                                              spec, ma);
1633                 RETURN(rc);
1634         }
1635
1636         /* Create multiple object sequenced by id */
1637         for (i = 0; i < count; i++) {
1638                 char *tmp_name = info->eti_name;
1639
1640                 echo_md_build_name(lname, tmp_name, id);
1641
1642                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1643                                              spec, ma);
1644                 if (rc) {
1645                         CERROR("Can not create child %s: rc = %d\n", tmp_name,
1646                                 rc);
1647                         break;
1648                 }
1649                 id++;
1650                 fid->f_oid++;
1651         }
1652
1653         RETURN(rc);
1654 }
1655
1656 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1657                                         struct echo_device *ed,
1658                                         struct md_object *parent,
1659                                         struct lu_name *lname)
1660 {
1661         struct echo_thread_info *info = echo_env_info(env);
1662         struct lu_fid           *fid = &info->eti_fid;
1663         struct lu_object        *child;
1664         int    rc;
1665         ENTRY;
1666
1667         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1668                PFID(fid), parent);
1669         rc = mdo_lookup(env, parent, lname, fid, NULL);
1670         if (rc) {
1671                 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1672                 RETURN(ERR_PTR(rc));
1673         }
1674
1675         /* In the function below, .hs_keycmp resolves to
1676          * lu_obj_hop_keycmp() */
1677         /* coverity[overrun-buffer-val] */
1678         child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1679
1680         RETURN(child);
1681 }
1682
1683 static int echo_setattr_object(const struct lu_env *env,
1684                                struct echo_device *ed,
1685                                struct lu_object *ec_parent,
1686                                __u64 id, int count)
1687 {
1688         struct lu_object        *parent;
1689         struct echo_thread_info *info = echo_env_info(env);
1690         struct lu_name          *lname = &info->eti_lname;
1691         char                    *name = info->eti_name;
1692         struct lu_device        *ld = ed->ed_next;
1693         struct lu_buf           *buf = &info->eti_buf;
1694         int                      rc = 0;
1695         int                      i;
1696
1697         ENTRY;
1698
1699         if (ec_parent == NULL)
1700                 return -1;
1701         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1702         if (parent == NULL)
1703                 RETURN(-ENXIO);
1704
1705         for (i = 0; i < count; i++) {
1706                 struct lu_object *ec_child, *child;
1707
1708                 echo_md_build_name(lname, name, id);
1709
1710                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1711                 if (IS_ERR(ec_child)) {
1712                         CERROR("Can't find child %s: rc = %ld\n",
1713                                 lname->ln_name, PTR_ERR(ec_child));
1714                         RETURN(PTR_ERR(ec_child));
1715                 }
1716
1717                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1718                 if (child == NULL) {
1719                         CERROR("Can not locate the child %s\n", lname->ln_name);
1720                         lu_object_put(env, ec_child);
1721                         rc = -EINVAL;
1722                         break;
1723                 }
1724
1725                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1726                        PFID(lu_object_fid(child)));
1727
1728                 buf->lb_buf = info->eti_xattr_buf;
1729                 buf->lb_len = sizeof(info->eti_xattr_buf);
1730
1731                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1732                 rc = mo_xattr_set(env, lu2md(child), buf, name,
1733                                   LU_XATTR_CREATE);
1734                 if (rc < 0) {
1735                         CERROR("Can not setattr child "DFID": rc = %d\n",
1736                                 PFID(lu_object_fid(child)), rc);
1737                         lu_object_put(env, ec_child);
1738                         break;
1739                 }
1740                 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1741                        PFID(lu_object_fid(child)));
1742                 id++;
1743                 lu_object_put(env, ec_child);
1744         }
1745         RETURN(rc);
1746 }
1747
1748 static int echo_getattr_object(const struct lu_env *env,
1749                                struct echo_device *ed,
1750                                struct lu_object *ec_parent,
1751                                __u64 id, int count)
1752 {
1753         struct lu_object        *parent;
1754         struct echo_thread_info *info = echo_env_info(env);
1755         struct lu_name          *lname = &info->eti_lname;
1756         char                    *name = info->eti_name;
1757         struct md_attr          *ma = &info->eti_ma;
1758         struct lu_device        *ld = ed->ed_next;
1759         int                      rc = 0;
1760         int                      i;
1761
1762         ENTRY;
1763
1764         if (ec_parent == NULL)
1765                 return -1;
1766         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1767         if (parent == NULL)
1768                 RETURN(-ENXIO);
1769
1770         memset(ma, 0, sizeof(*ma));
1771         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1772         ma->ma_acl = info->eti_xattr_buf;
1773         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1774
1775         for (i = 0; i < count; i++) {
1776                 struct lu_object *ec_child, *child;
1777
1778                 ma->ma_valid = 0;
1779                 echo_md_build_name(lname, name, id);
1780                 echo_set_lmm_size(env, ld, ma);
1781
1782                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1783                 if (IS_ERR(ec_child)) {
1784                         CERROR("Can't find child %s: rc = %ld\n",
1785                                lname->ln_name, PTR_ERR(ec_child));
1786                         RETURN(PTR_ERR(ec_child));
1787                 }
1788
1789                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1790                 if (child == NULL) {
1791                         CERROR("Can not locate the child %s\n", lname->ln_name);
1792                         lu_object_put(env, ec_child);
1793                         RETURN(-EINVAL);
1794                 }
1795
1796                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1797                        PFID(lu_object_fid(child)));
1798                 rc = echo_attr_get_complex(env, lu2md(child), ma);
1799                 if (rc) {
1800                         CERROR("Can not getattr child "DFID": rc = %d\n",
1801                                 PFID(lu_object_fid(child)), rc);
1802                         lu_object_put(env, ec_child);
1803                         break;
1804                 }
1805                 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1806                        PFID(lu_object_fid(child)));
1807                 id++;
1808                 lu_object_put(env, ec_child);
1809         }
1810
1811         RETURN(rc);
1812 }
1813
1814 static int echo_lookup_object(const struct lu_env *env,
1815                               struct echo_device *ed,
1816                               struct lu_object *ec_parent,
1817                               __u64 id, int count)
1818 {
1819         struct lu_object        *parent;
1820         struct echo_thread_info *info = echo_env_info(env);
1821         struct lu_name          *lname = &info->eti_lname;
1822         char                    *name = info->eti_name;
1823         struct lu_fid           *fid = &info->eti_fid;
1824         struct lu_device        *ld = ed->ed_next;
1825         int                      rc = 0;
1826         int                      i;
1827
1828         if (ec_parent == NULL)
1829                 return -1;
1830         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1831         if (parent == NULL)
1832                 return -ENXIO;
1833
1834         /*prepare the requests*/
1835         for (i = 0; i < count; i++) {
1836                 echo_md_build_name(lname, name, id);
1837
1838                 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1839                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1840
1841                 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1842                 if (rc) {
1843                         CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1844                         break;
1845                 }
1846                 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1847                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1848
1849                 id++;
1850         }
1851         return rc;
1852 }
1853
1854 static int echo_md_destroy_internal(const struct lu_env *env,
1855                                     struct echo_device *ed,
1856                                     struct md_object *parent,
1857                                     struct lu_name *lname,
1858                                     struct md_attr *ma)
1859 {
1860         struct lu_device   *ld = ed->ed_next;
1861         struct lu_object   *ec_child;
1862         struct lu_object   *child;
1863         int                 rc;
1864
1865         ENTRY;
1866
1867         ec_child = echo_md_lookup(env, ed, parent, lname);
1868         if (IS_ERR(ec_child)) {
1869                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1870                         PTR_ERR(ec_child));
1871                 RETURN(PTR_ERR(ec_child));
1872         }
1873
1874         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1875         if (child == NULL) {
1876                 CERROR("Can not locate the child %s\n", lname->ln_name);
1877                 GOTO(out_put, rc = -EINVAL);
1878         }
1879
1880         if (lu_object_remote(child)) {
1881                 CERROR("Can not destroy remote object %s: rc = %d\n",
1882                        lname->ln_name, -EPERM);
1883                 GOTO(out_put, rc = -EPERM);
1884         }
1885         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1886                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1887
1888         rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1889         if (rc) {
1890                 CERROR("Can not unlink child %s: rc = %d\n",
1891                         lname->ln_name, rc);
1892                 GOTO(out_put, rc);
1893         }
1894         CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1895                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1896 out_put:
1897         lu_object_put(env, ec_child);
1898         return rc;
1899 }
1900
1901 static int echo_destroy_object(const struct lu_env *env,
1902                                struct echo_device *ed,
1903                                struct lu_object *ec_parent,
1904                                char *name, int namelen,
1905                                __u64 id, __u32 mode,
1906                                int count)
1907 {
1908         struct echo_thread_info *info = echo_env_info(env);
1909         struct lu_name          *lname = &info->eti_lname;
1910         struct md_attr          *ma = &info->eti_ma;
1911         struct lu_device        *ld = ed->ed_next;
1912         struct lu_object        *parent;
1913         int                      rc = 0;
1914         int                      i;
1915         ENTRY;
1916
1917         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1918         if (parent == NULL)
1919                 RETURN(-EINVAL);
1920
1921         memset(ma, 0, sizeof(*ma));
1922         ma->ma_attr.la_mode = mode;
1923         ma->ma_attr.la_valid = LA_CTIME;
1924         ma->ma_attr.la_ctime = cfs_time_current_64();
1925         ma->ma_need = MA_INODE;
1926         ma->ma_valid = 0;
1927
1928         if (name != NULL) {
1929                 lname->ln_name = name;
1930                 lname->ln_namelen = namelen;
1931                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1932                                               ma);
1933                 RETURN(rc);
1934         }
1935
1936         /*prepare the requests*/
1937         for (i = 0; i < count; i++) {
1938                 char *tmp_name = info->eti_name;
1939
1940                 ma->ma_valid = 0;
1941                 echo_md_build_name(lname, tmp_name, id);
1942
1943                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1944                                               ma);
1945                 if (rc) {
1946                         CERROR("Can not unlink child %s: rc = %d\n", name, rc);
1947                         break;
1948                 }
1949                 id++;
1950         }
1951
1952         RETURN(rc);
1953 }
1954
1955 static struct lu_object *echo_resolve_path(const struct lu_env *env,
1956                                            struct echo_device *ed, char *path,
1957                                            int path_len)
1958 {
1959         struct lu_device        *ld = ed->ed_next;
1960         struct echo_thread_info *info = echo_env_info(env);
1961         struct lu_fid           *fid = &info->eti_fid;
1962         struct lu_name          *lname = &info->eti_lname;
1963         struct lu_object        *parent = NULL;
1964         struct lu_object        *child = NULL;
1965         int                      rc = 0;
1966         ENTRY;
1967
1968         *fid = ed->ed_root_fid;
1969
1970         /* In the function below, .hs_keycmp resolves to
1971          * lu_obj_hop_keycmp() */
1972         /* coverity[overrun-buffer-val] */
1973         parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1974         if (IS_ERR(parent)) {
1975                 CERROR("Can not find the parent "DFID": rc = %ld\n",
1976                         PFID(fid), PTR_ERR(parent));
1977                 RETURN(parent);
1978         }
1979
1980         while (1) {
1981                 struct lu_object *ld_parent;
1982                 char *e;
1983
1984                 e = strsep(&path, "/");
1985                 if (e == NULL)
1986                         break;
1987
1988                 if (e[0] == 0) {
1989                         if (!path || path[0] == '\0')
1990                                 break;
1991                         continue;
1992                 }
1993
1994                 lname->ln_name = e;
1995                 lname->ln_namelen = strlen(e);
1996
1997                 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
1998                 if (ld_parent == NULL) {
1999                         lu_object_put(env, parent);
2000                         rc = -EINVAL;
2001                         break;
2002                 }
2003
2004                 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
2005                 lu_object_put(env, parent);
2006                 if (IS_ERR(child)) {
2007                         rc = (int)PTR_ERR(child);
2008                         CERROR("lookup %s under parent "DFID": rc = %d\n",
2009                                 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2010                                 rc);
2011                         break;
2012                 }
2013                 parent = child;
2014         }
2015         if (rc)
2016                 RETURN(ERR_PTR(rc));
2017
2018         RETURN(parent);
2019 }
2020
2021 static void echo_ucred_init(struct lu_env *env)
2022 {
2023         struct lu_ucred *ucred = lu_ucred(env);
2024
2025         ucred->uc_valid = UCRED_INVALID;
2026
2027         ucred->uc_suppgids[0] = -1;
2028         ucred->uc_suppgids[1] = -1;
2029
2030         ucred->uc_uid = ucred->uc_o_uid  =
2031                                 from_kuid(&init_user_ns, current_uid());
2032         ucred->uc_gid = ucred->uc_o_gid  =
2033                                 from_kgid(&init_user_ns, current_gid());
2034         ucred->uc_fsuid = ucred->uc_o_fsuid =
2035                                 from_kuid(&init_user_ns, current_fsuid());
2036         ucred->uc_fsgid = ucred->uc_o_fsgid =
2037                                 from_kgid(&init_user_ns, current_fsgid());
2038         ucred->uc_cap = cfs_curproc_cap_pack();
2039
2040         /* remove fs privilege for non-root user. */
2041         if (ucred->uc_fsuid)
2042                 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2043         ucred->uc_valid = UCRED_NEW;
2044 }
2045
2046 static void echo_ucred_fini(struct lu_env *env)
2047 {
2048         struct lu_ucred *ucred = lu_ucred(env);
2049         ucred->uc_valid = UCRED_INIT;
2050 }
2051
2052 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
2053 #define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
2054 static int echo_md_handler(struct echo_device *ed, int command,
2055                            char *path, int path_len, __u64 id, int count,
2056                            struct obd_ioctl_data *data)
2057 {
2058         struct echo_thread_info *info;
2059         struct lu_device      *ld = ed->ed_next;
2060         struct lu_env         *env;
2061         int                    refcheck;
2062         struct lu_object      *parent;
2063         char                  *name = NULL;
2064         int                    namelen = data->ioc_plen2;
2065         int                    rc = 0;
2066         ENTRY;
2067
2068         if (ld == NULL) {
2069                 CERROR("MD echo client is not being initialized properly\n");
2070                 RETURN(-EINVAL);
2071         }
2072
2073         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2074                 CERROR("Only support MDD layer right now!\n");
2075                 RETURN(-EINVAL);
2076         }
2077
2078         env = cl_env_get(&refcheck);
2079         if (IS_ERR(env))
2080                 RETURN(PTR_ERR(env));
2081
2082         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
2083         if (rc != 0)
2084                 GOTO(out_env, rc);
2085
2086         /* init big_lmm buffer */
2087         info = echo_env_info(env);
2088         LASSERT(info->eti_big_lmm == NULL);
2089         OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2090         if (info->eti_big_lmm == NULL)
2091                 GOTO(out_env, rc = -ENOMEM);
2092         info->eti_big_lmmsize = MIN_MD_SIZE;
2093
2094         parent = echo_resolve_path(env, ed, path, path_len);
2095         if (IS_ERR(parent)) {
2096                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2097                         PTR_ERR(parent));
2098                 GOTO(out_free, rc = PTR_ERR(parent));
2099         }
2100
2101         if (namelen > 0) {
2102                 OBD_ALLOC(name, namelen + 1);
2103                 if (name == NULL)
2104                         GOTO(out_put, rc = -ENOMEM);
2105                 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2106                         GOTO(out_name, rc = -EFAULT);
2107         }
2108
2109         echo_ucred_init(env);
2110
2111         switch (command) {
2112         case ECHO_MD_CREATE:
2113         case ECHO_MD_MKDIR: {
2114                 struct echo_thread_info *info = echo_env_info(env);
2115                 __u32 mode = data->ioc_obdo2.o_mode;
2116                 struct lu_fid *fid = &info->eti_fid;
2117                 int stripe_count = (int)data->ioc_obdo2.o_misc;
2118                 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2119
2120                 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2121                 if (rc != 0)
2122                         break;
2123
2124                 /* In the function below, .hs_keycmp resolves to
2125                  * lu_obj_hop_keycmp() */
2126                 /* coverity[overrun-buffer-val] */
2127                 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2128                                            id, mode, count, stripe_count,
2129                                            stripe_index);
2130                 break;
2131         }
2132         case ECHO_MD_DESTROY:
2133         case ECHO_MD_RMDIR: {
2134                 __u32 mode = data->ioc_obdo2.o_mode;
2135
2136                 rc = echo_destroy_object(env, ed, parent, name, namelen,
2137                                          id, mode, count);
2138                 break;
2139         }
2140         case ECHO_MD_LOOKUP:
2141                 rc = echo_lookup_object(env, ed, parent, id, count);
2142                 break;
2143         case ECHO_MD_GETATTR:
2144                 rc = echo_getattr_object(env, ed, parent, id, count);
2145                 break;
2146         case ECHO_MD_SETATTR:
2147                 rc = echo_setattr_object(env, ed, parent, id, count);
2148                 break;
2149         default:
2150                 CERROR("unknown command %d\n", command);
2151                 rc = -EINVAL;
2152                 break;
2153         }
2154         echo_ucred_fini(env);
2155
2156 out_name:
2157         if (name != NULL)
2158                 OBD_FREE(name, namelen + 1);
2159 out_put:
2160         lu_object_put(env, parent);
2161 out_free:
2162         LASSERT(info->eti_big_lmm);
2163         OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2164         info->eti_big_lmm = NULL;
2165         info->eti_big_lmmsize = 0;
2166 out_env:
2167         cl_env_put(env, &refcheck);
2168         return rc;
2169 }
2170 #endif /* HAVE_SERVER_SUPPORT */
2171
2172 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2173                               struct obdo *oa, struct obd_trans_info *oti)
2174 {
2175         struct echo_object      *eco;
2176         struct echo_client_obd  *ec = ed->ed_ec;
2177         int created = 0;
2178         int rc;
2179         ENTRY;
2180
2181         if (!(oa->o_valid & OBD_MD_FLID) ||
2182             !(oa->o_valid & OBD_MD_FLGROUP) ||
2183             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
2184                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2185                 RETURN(-EINVAL);
2186         }
2187
2188         if (ostid_id(&oa->o_oi) == 0)
2189                 ostid_set_id(&oa->o_oi, ++last_object_id);
2190
2191         rc = obd_create(env, ec->ec_exp, oa, oti);
2192         if (rc != 0) {
2193                 CERROR("Cannot create objects: rc = %d\n", rc);
2194                 GOTO(failed, rc);
2195         }
2196
2197         created = 1;
2198
2199         oa->o_valid |= OBD_MD_FLID;
2200
2201         eco = cl_echo_object_find(ed, &oa->o_oi);
2202         if (IS_ERR(eco))
2203                 GOTO(failed, rc = PTR_ERR(eco));
2204         cl_echo_object_put(eco);
2205
2206         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2207         EXIT;
2208
2209 failed:
2210         if (created && rc != 0)
2211                 obd_destroy(env, ec->ec_exp, oa, oti);
2212
2213         if (rc != 0)
2214                 CERROR("create object failed with: rc = %d\n", rc);
2215
2216         return rc;
2217 }
2218
2219 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2220                            struct obdo *oa)
2221 {
2222         struct echo_object *eco;
2223         int rc;
2224         ENTRY;
2225
2226         if (!(oa->o_valid & OBD_MD_FLID) ||
2227             !(oa->o_valid & OBD_MD_FLGROUP) ||
2228             ostid_id(&oa->o_oi) == 0) {
2229                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2230                 RETURN(-EINVAL);
2231         }
2232
2233         rc = 0;
2234         eco = cl_echo_object_find(ed, &oa->o_oi);
2235         if (!IS_ERR(eco))
2236                 *ecop = eco;
2237         else
2238                 rc = PTR_ERR(eco);
2239
2240         RETURN(rc);
2241 }
2242
2243 static void echo_put_object(struct echo_object *eco)
2244 {
2245         int rc;
2246
2247         rc = cl_echo_object_put(eco);
2248         if (rc)
2249                 CERROR("%s: echo client drop an object failed: rc = %d\n",
2250                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2251 }
2252
2253 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2254                                          u64 offset, u64 count)
2255 {
2256         char    *addr;
2257         u64      stripe_off;
2258         u64      stripe_id;
2259         int      delta;
2260
2261         /* no partial pages on the client */
2262         LASSERT(count == PAGE_CACHE_SIZE);
2263
2264         addr = kmap(page);
2265
2266         for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2267                 if (rw == OBD_BRW_WRITE) {
2268                         stripe_off = offset + delta;
2269                         stripe_id = id;
2270                 } else {
2271                         stripe_off = 0xdeadbeef00c0ffeeULL;
2272                         stripe_id = 0xdeadbeef00c0ffeeULL;
2273                 }
2274                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2275                                   stripe_off, stripe_id);
2276         }
2277
2278         kunmap(page);
2279 }
2280
2281 static int
2282 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2283 {
2284         u64      stripe_off;
2285         u64      stripe_id;
2286         char   *addr;
2287         int     delta;
2288         int     rc;
2289         int     rc2;
2290
2291         /* no partial pages on the client */
2292         LASSERT(count == PAGE_CACHE_SIZE);
2293
2294         addr = kmap(page);
2295
2296         for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2297                 stripe_off = offset + delta;
2298                 stripe_id = id;
2299
2300                 rc2 = block_debug_check("test_brw",
2301                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
2302                                         stripe_off, stripe_id);
2303                 if (rc2 != 0) {
2304                         CERROR ("Error in echo object "LPX64"\n", id);
2305                         rc = rc2;
2306                 }
2307         }
2308
2309         kunmap(page);
2310         return rc;
2311 }
2312
2313 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2314                             struct echo_object *eco, u64 offset,
2315                             u64 count, int async,
2316                             struct obd_trans_info *oti)
2317 {
2318         size_t                  npages;
2319         struct brw_page        *pga;
2320         struct brw_page        *pgp;
2321         struct page            **pages;
2322         u64                      off;
2323         size_t                  i;
2324         int                     rc;
2325         int                     verify;
2326         gfp_t                   gfp_mask;
2327         u32                     brw_flags = 0;
2328         ENTRY;
2329
2330         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2331                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2332                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2333
2334         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
2335
2336         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2337
2338         if ((count & (~PAGE_MASK)) != 0)
2339                 RETURN(-EINVAL);
2340
2341         /* XXX think again with misaligned I/O */
2342         npages = count >> PAGE_CACHE_SHIFT;
2343
2344         if (rw == OBD_BRW_WRITE)
2345                 brw_flags = OBD_BRW_ASYNC;
2346
2347         OBD_ALLOC(pga, npages * sizeof(*pga));
2348         if (pga == NULL)
2349                 RETURN(-ENOMEM);
2350
2351         OBD_ALLOC(pages, npages * sizeof(*pages));
2352         if (pages == NULL) {
2353                 OBD_FREE(pga, npages * sizeof(*pga));
2354                 RETURN(-ENOMEM);
2355         }
2356
2357         for (i = 0, pgp = pga, off = offset;
2358              i < npages;
2359              i++, pgp++, off += PAGE_CACHE_SIZE) {
2360
2361                 LASSERT (pgp->pg == NULL);      /* for cleanup */
2362
2363                 rc = -ENOMEM;
2364                 OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
2365                 if (pgp->pg == NULL)
2366                         goto out;
2367
2368                 pages[i] = pgp->pg;
2369                 pgp->count = PAGE_CACHE_SIZE;
2370                 pgp->off = off;
2371                 pgp->flag = brw_flags;
2372
2373                 if (verify)
2374                         echo_client_page_debug_setup(pgp->pg, rw,
2375                                                      ostid_id(&oa->o_oi), off,
2376                                                      pgp->count);
2377         }
2378
2379         /* brw mode can only be used at client */
2380         LASSERT(ed->ed_next != NULL);
2381         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2382
2383  out:
2384         if (rc != 0 || rw != OBD_BRW_READ)
2385                 verify = 0;
2386
2387         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2388                 if (pgp->pg == NULL)
2389                         continue;
2390
2391                 if (verify) {
2392                         int vrc;
2393                         vrc = echo_client_page_debug_check(pgp->pg,
2394                                                            ostid_id(&oa->o_oi),
2395                                                            pgp->off, pgp->count);
2396                         if (vrc != 0 && rc == 0)
2397                                 rc = vrc;
2398                 }
2399                 OBD_PAGE_FREE(pgp->pg);
2400         }
2401         OBD_FREE(pga, npages * sizeof(*pga));
2402         OBD_FREE(pages, npages * sizeof(*pages));
2403         RETURN(rc);
2404 }
2405
2406 static int echo_client_prep_commit(const struct lu_env *env,
2407                                    struct obd_export *exp, int rw,
2408                                    struct obdo *oa, struct echo_object *eco,
2409                                    u64 offset, u64 count,
2410                                    u64 batch, struct obd_trans_info *oti,
2411                                    int async)
2412 {
2413         struct obd_ioobj         ioo;
2414         struct niobuf_local     *lnb;
2415         struct niobuf_remote    *rnb;
2416         u64                      off;
2417         u64                      npages, tot_pages;
2418         int i, ret = 0, brw_flags = 0;
2419
2420         ENTRY;
2421
2422         if (count <= 0 || (count & ~PAGE_CACHE_MASK) != 0)
2423                 RETURN(-EINVAL);
2424
2425         npages = batch >> PAGE_CACHE_SHIFT;
2426         tot_pages = count >> PAGE_CACHE_SHIFT;
2427
2428         OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
2429         OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
2430
2431         if (lnb == NULL || rnb == NULL)
2432                 GOTO(out, ret = -ENOMEM);
2433
2434         if (rw == OBD_BRW_WRITE && async)
2435                 brw_flags |= OBD_BRW_ASYNC;
2436
2437         obdo_to_ioobj(oa, &ioo);
2438
2439         off = offset;
2440
2441         for(; tot_pages; tot_pages -= npages) {
2442                 int lpages;
2443
2444                 if (tot_pages < npages)
2445                         npages = tot_pages;
2446
2447                 for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
2448                         rnb[i].rnb_offset = off;
2449                         rnb[i].rnb_len = PAGE_CACHE_SIZE;
2450                         rnb[i].rnb_flags = brw_flags;
2451                 }
2452
2453                 ioo.ioo_bufcnt = npages;
2454
2455                 lpages = npages;
2456                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
2457                                  lnb, oti);
2458                 if (ret != 0)
2459                         GOTO(out, ret);
2460                 LASSERT(lpages == npages);
2461
2462                 for (i = 0; i < lpages; i++) {
2463                         struct page *page = lnb[i].lnb_page;
2464
2465                         /* read past eof? */
2466                         if (page == NULL && lnb[i].lnb_rc == 0)
2467                                 continue;
2468
2469                         if (async)
2470                                 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2471
2472                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2473                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2474                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2475                                 continue;
2476
2477                         if (rw == OBD_BRW_WRITE)
2478                                 echo_client_page_debug_setup(page, rw,
2479                                                             ostid_id(&oa->o_oi),
2480                                                              rnb[i].rnb_offset,
2481                                                              rnb[i].rnb_len);
2482                         else
2483                                 echo_client_page_debug_check(page,
2484                                                             ostid_id(&oa->o_oi),
2485                                                              rnb[i].rnb_offset,
2486                                                              rnb[i].rnb_len);
2487                 }
2488
2489                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
2490                                    rnb, npages, lnb, oti, ret);
2491                 if (ret != 0)
2492                         GOTO(out, ret);
2493
2494                 /* Reset oti otherwise it would confuse ldiskfs. */
2495                 memset(oti, 0, sizeof(*oti));
2496
2497                 /* Reuse env context. */
2498                 lu_context_exit((struct lu_context *)&env->le_ctx);
2499                 lu_context_enter((struct lu_context *)&env->le_ctx);
2500         }
2501
2502 out:
2503         if (lnb)
2504                 OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
2505         if (rnb)
2506                 OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
2507         RETURN(ret);
2508 }
2509
2510 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2511                                  struct obd_export *exp,
2512                                  struct obd_ioctl_data *data,
2513                                  struct obd_trans_info *dummy_oti)
2514 {
2515         struct obd_device *obd = class_exp2obd(exp);
2516         struct echo_device *ed = obd2echo_dev(obd);
2517         struct echo_client_obd *ec = ed->ed_ec;
2518         struct obdo *oa = &data->ioc_obdo1;
2519         struct echo_object *eco;
2520         int rc;
2521         int async = 0;
2522         long test_mode;
2523         ENTRY;
2524
2525         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2526
2527         rc = echo_get_object(&eco, ed, oa);
2528         if (rc)
2529                 RETURN(rc);
2530
2531         oa->o_valid &= ~OBD_MD_FLHANDLE;
2532
2533         /* OFD/obdfilter works only via prep/commit */
2534         test_mode = (long)data->ioc_pbuf1;
2535         if (ed->ed_next == NULL && test_mode != 3) {
2536                 test_mode = 3;
2537                 data->ioc_plen1 = data->ioc_count;
2538         }
2539
2540         if (test_mode == 3)
2541                 async = 1;
2542
2543         /* Truncate batch size to maximum */
2544         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2545                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2546
2547         switch (test_mode) {
2548         case 1:
2549                 /* fall through */
2550         case 2:
2551                 rc = echo_client_kbrw(ed, rw, oa,
2552                                       eco, data->ioc_offset,
2553                                       data->ioc_count, async, dummy_oti);
2554                 break;
2555         case 3:
2556                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
2557                                              eco, data->ioc_offset,
2558                                              data->ioc_count, data->ioc_plen1,
2559                                              dummy_oti, async);
2560                 break;
2561         default:
2562                 rc = -EINVAL;
2563         }
2564         echo_put_object(eco);
2565         RETURN(rc);
2566 }
2567
2568 static int
2569 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2570                       void *karg, void *uarg)
2571 {
2572 #ifdef HAVE_SERVER_SUPPORT
2573         struct tgt_session_info *tsi;
2574 #endif
2575         struct obd_device      *obd = exp->exp_obd;
2576         struct echo_device     *ed = obd2echo_dev(obd);
2577         struct echo_client_obd *ec = ed->ed_ec;
2578         struct echo_object     *eco;
2579         struct obd_ioctl_data  *data = karg;
2580         struct obd_trans_info   dummy_oti;
2581         struct lu_env          *env;
2582         struct oti_req_ack_lock *ack_lock;
2583         struct obdo            *oa;
2584         struct lu_fid           fid;
2585         int                     rw = OBD_BRW_READ;
2586         int                     rc = 0;
2587         int                     i;
2588 #ifdef HAVE_SERVER_SUPPORT
2589         struct lu_context        echo_session;
2590 #endif
2591         ENTRY;
2592
2593         memset(&dummy_oti, 0, sizeof(dummy_oti));
2594
2595         oa = &data->ioc_obdo1;
2596         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2597                 oa->o_valid |= OBD_MD_FLGROUP;
2598                 ostid_set_seq_echo(&oa->o_oi);
2599         }
2600
2601         /* This FID is unpacked just for validation at this point */
2602         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2603         if (rc < 0)
2604                 RETURN(rc);
2605
2606         OBD_ALLOC_PTR(env);
2607         if (env == NULL)
2608                 RETURN(-ENOMEM);
2609
2610         rc = lu_env_init(env, LCT_DT_THREAD);
2611         if (rc)
2612                 GOTO(out_alloc, rc = -ENOMEM);
2613
2614 #ifdef HAVE_SERVER_SUPPORT
2615         env->le_ses = &echo_session;
2616         rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
2617         if (unlikely(rc < 0))
2618                 GOTO(out_env, rc);
2619         lu_context_enter(env->le_ses);
2620
2621         tsi = tgt_ses_info(env);
2622         tsi->tsi_exp = ec->ec_exp;
2623         tsi->tsi_jobid = NULL;
2624 #endif
2625         switch (cmd) {
2626         case OBD_IOC_CREATE:                    /* may create echo object */
2627                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2628                         GOTO (out, rc = -EPERM);
2629
2630                 rc = echo_create_object(env, ed, oa, &dummy_oti);
2631                 GOTO(out, rc);
2632
2633 #ifdef HAVE_SERVER_SUPPORT
2634         case OBD_IOC_ECHO_MD: {
2635                 int count;
2636                 int cmd;
2637                 char *dir = NULL;
2638                 int dirlen;
2639                 __u64 id;
2640
2641                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2642                         GOTO(out, rc = -EPERM);
2643
2644                 count = data->ioc_count;
2645                 cmd = data->ioc_command;
2646
2647                 id = data->ioc_obdo2.o_oi.oi.oi_id;
2648                 dirlen = data->ioc_plen1;
2649                 OBD_ALLOC(dir, dirlen + 1);
2650                 if (dir == NULL)
2651                         GOTO(out, rc = -ENOMEM);
2652
2653                 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2654                         OBD_FREE(dir, data->ioc_plen1 + 1);
2655                         GOTO(out, rc = -EFAULT);
2656                 }
2657
2658                 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2659                 OBD_FREE(dir, dirlen + 1);
2660                 GOTO(out, rc);
2661         }
2662         case OBD_IOC_ECHO_ALLOC_SEQ: {
2663                 struct lu_env   *cl_env;
2664                 int              refcheck;
2665                 __u64            seq;
2666                 int              max_count;
2667
2668                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2669                         GOTO(out, rc = -EPERM);
2670
2671                 cl_env = cl_env_get(&refcheck);
2672                 if (IS_ERR(cl_env))
2673                         GOTO(out, rc = PTR_ERR(cl_env));
2674
2675                 rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
2676                                             ECHO_MD_SES_TAG);
2677                 if (rc != 0) {
2678                         cl_env_put(cl_env, &refcheck);
2679                         GOTO(out, rc);
2680                 }
2681
2682                 rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
2683                 cl_env_put(cl_env, &refcheck);
2684                 if (rc < 0) {
2685                         CERROR("%s: Can not alloc seq: rc = %d\n",
2686                                obd->obd_name, rc);
2687                         GOTO(out, rc);
2688                 }
2689
2690                 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2691                         return -EFAULT;
2692
2693                 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2694                 if (copy_to_user(data->ioc_pbuf2, &max_count,
2695                                      data->ioc_plen2))
2696                         return -EFAULT;
2697                 GOTO(out, rc);
2698         }
2699 #endif /* HAVE_SERVER_SUPPORT */
2700         case OBD_IOC_DESTROY:
2701                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2702                         GOTO (out, rc = -EPERM);
2703
2704                 rc = echo_get_object(&eco, ed, oa);
2705                 if (rc == 0) {
2706                         rc = obd_destroy(env, ec->ec_exp, oa, &dummy_oti);
2707                         if (rc == 0)
2708                                 eco->eo_deleted = 1;
2709                         echo_put_object(eco);
2710                 }
2711                 GOTO(out, rc);
2712
2713         case OBD_IOC_GETATTR:
2714                 rc = echo_get_object(&eco, ed, oa);
2715                 if (rc == 0) {
2716                         struct obd_info oinfo = {
2717                                 .oi_oa = oa,
2718                         };
2719
2720                         rc = obd_getattr(env, ec->ec_exp, &oinfo);
2721                         echo_put_object(eco);
2722                 }
2723                 GOTO(out, rc);
2724
2725         case OBD_IOC_SETATTR:
2726                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2727                         GOTO (out, rc = -EPERM);
2728
2729                 rc = echo_get_object(&eco, ed, oa);
2730                 if (rc == 0) {
2731                         struct obd_info oinfo = {
2732                                 .oi_oa = oa,
2733                         };
2734
2735                         rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
2736                         echo_put_object(eco);
2737                 }
2738                 GOTO(out, rc);
2739
2740         case OBD_IOC_BRW_WRITE:
2741                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2742                         GOTO (out, rc = -EPERM);
2743
2744                 rw = OBD_BRW_WRITE;
2745                 /* fall through */
2746         case OBD_IOC_BRW_READ:
2747                 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
2748                 GOTO(out, rc);
2749
2750         default:
2751                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2752                 GOTO (out, rc = -ENOTTY);
2753         }
2754
2755         EXIT;
2756 out:
2757 #ifdef HAVE_SERVER_SUPPORT
2758         lu_context_exit(env->le_ses);
2759         lu_context_fini(env->le_ses);
2760 out_env:
2761 #endif
2762         lu_env_fini(env);
2763 out_alloc:
2764         OBD_FREE_PTR(env);
2765
2766         /* XXX this should be in a helper also called by target_send_reply */
2767         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2768              i++, ack_lock++) {
2769                 if (!ack_lock->mode)
2770                         break;
2771                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2772         }
2773
2774         return rc;
2775 }
2776
2777 static int echo_client_setup(const struct lu_env *env,
2778                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2779 {
2780         struct echo_client_obd *ec = &obddev->u.echo_client;
2781         struct obd_device *tgt;
2782         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2783         struct obd_connect_data *ocd = NULL;
2784         int rc;
2785         ENTRY;
2786
2787         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2788                 CERROR("requires a TARGET OBD name\n");
2789                 RETURN(-EINVAL);
2790         }
2791
2792         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2793         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2794                 CERROR("device not attached or not set up (%s)\n",
2795                        lustre_cfg_string(lcfg, 1));
2796                 RETURN(-EINVAL);
2797         }
2798
2799         spin_lock_init(&ec->ec_lock);
2800         INIT_LIST_HEAD(&ec->ec_objects);
2801         INIT_LIST_HEAD(&ec->ec_locks);
2802         ec->ec_unique = 0;
2803
2804         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2805 #ifdef HAVE_SERVER_SUPPORT
2806                 lu_context_tags_update(ECHO_MD_CTX_TAG);
2807                 lu_session_tags_update(ECHO_MD_SES_TAG);
2808 #else
2809                 CERROR("Local operations are NOT supported on client side. "
2810                        "Only remote operations are supported. Metadata client "
2811                        "must be run on server side.\n");
2812 #endif
2813                 RETURN(0);
2814         }
2815
2816         OBD_ALLOC(ocd, sizeof(*ocd));
2817         if (ocd == NULL) {
2818                 CERROR("Can't alloc ocd connecting to %s\n",
2819                        lustre_cfg_string(lcfg, 1));
2820                 return -ENOMEM;
2821         }
2822
2823         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2824                                  OBD_CONNECT_BRW_SIZE |
2825                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2826                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2827                                  OBD_CONNECT_FID;
2828         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2829         ocd->ocd_version = LUSTRE_VERSION_CODE;
2830         ocd->ocd_group = FID_SEQ_ECHO;
2831
2832         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2833         if (rc == 0) {
2834                 /* Turn off pinger because it connects to tgt obd directly. */
2835                 spin_lock(&tgt->obd_dev_lock);
2836                 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2837                 spin_unlock(&tgt->obd_dev_lock);
2838         }
2839
2840         OBD_FREE(ocd, sizeof(*ocd));
2841
2842         if (rc != 0) {
2843                 CERROR("fail to connect to device %s\n",
2844                        lustre_cfg_string(lcfg, 1));
2845                 return (rc);
2846         }
2847
2848         RETURN(rc);
2849 }
2850
2851 static int echo_client_cleanup(struct obd_device *obddev)
2852 {
2853         struct echo_device *ed = obd2echo_dev(obddev);
2854         struct echo_client_obd *ec = &obddev->u.echo_client;
2855         int rc;
2856         ENTRY;
2857
2858         /*Do nothing for Metadata echo client*/
2859         if (ed == NULL )
2860                 RETURN(0);
2861
2862         if (ed->ed_next_ismd) {
2863 #ifdef HAVE_SERVER_SUPPORT
2864                 lu_context_tags_clear(ECHO_MD_CTX_TAG);
2865                 lu_session_tags_clear(ECHO_MD_SES_TAG);
2866 #else
2867                 CERROR("This is client-side only module, does not support "
2868                         "metadata echo client.\n");
2869 #endif
2870                 RETURN(0);
2871         }
2872
2873         if (!list_empty(&obddev->obd_exports)) {
2874                 CERROR("still has clients!\n");
2875                 RETURN(-EBUSY);
2876         }
2877
2878         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2879         rc = obd_disconnect(ec->ec_exp);
2880         if (rc != 0)
2881                 CERROR("fail to disconnect device: %d\n", rc);
2882
2883         RETURN(rc);
2884 }
2885
2886 static int echo_client_connect(const struct lu_env *env,
2887                                struct obd_export **exp,
2888                                struct obd_device *src, struct obd_uuid *cluuid,
2889                                struct obd_connect_data *data, void *localdata)
2890 {
2891         int                rc;
2892         struct lustre_handle conn = { 0 };
2893
2894         ENTRY;
2895         rc = class_connect(&conn, src, cluuid);
2896         if (rc == 0) {
2897                 *exp = class_conn2export(&conn);
2898         }
2899
2900         RETURN (rc);
2901 }
2902
2903 static int echo_client_disconnect(struct obd_export *exp)
2904 {
2905         int                     rc;
2906         ENTRY;
2907
2908         if (exp == NULL)
2909                 GOTO(out, rc = -EINVAL);
2910
2911         rc = class_disconnect(exp);
2912         GOTO(out, rc);
2913  out:
2914         return rc;
2915 }
2916
2917 static struct obd_ops echo_client_obd_ops = {
2918         .o_owner       = THIS_MODULE,
2919         .o_iocontrol   = echo_client_iocontrol,
2920         .o_connect     = echo_client_connect,
2921         .o_disconnect  = echo_client_disconnect
2922 };
2923
2924 static int echo_client_init(void)
2925 {
2926         int rc;
2927
2928         rc = lu_kmem_init(echo_caches);
2929         if (rc == 0) {
2930                 rc = class_register_type(&echo_client_obd_ops, NULL, true, NULL,
2931                                          LUSTRE_ECHO_CLIENT_NAME,
2932                                          &echo_device_type);
2933                 if (rc)
2934                         lu_kmem_fini(echo_caches);
2935         }
2936         return rc;
2937 }
2938
2939 static void echo_client_exit(void)
2940 {
2941         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2942         lu_kmem_fini(echo_caches);
2943 }
2944
2945 static int __init obdecho_init(void)
2946 {
2947         int rc;
2948
2949         ENTRY;
2950         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2951
2952         LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2953
2954 # ifdef HAVE_SERVER_SUPPORT
2955         rc = echo_persistent_pages_init();
2956         if (rc != 0)
2957                 goto failed_0;
2958
2959         rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
2960                                  LUSTRE_ECHO_NAME, NULL);
2961         if (rc != 0)
2962                 goto failed_1;
2963 # endif
2964
2965         rc = echo_client_init();
2966
2967 # ifdef HAVE_SERVER_SUPPORT
2968         if (rc == 0)
2969                 RETURN(0);
2970
2971         class_unregister_type(LUSTRE_ECHO_NAME);
2972 failed_1:
2973         echo_persistent_pages_fini();
2974 failed_0:
2975 # endif
2976         RETURN(rc);
2977 }
2978
2979 static void /*__exit*/ obdecho_exit(void)
2980 {
2981         echo_client_exit();
2982
2983 # ifdef HAVE_SERVER_SUPPORT
2984         class_unregister_type(LUSTRE_ECHO_NAME);
2985         echo_persistent_pages_fini();
2986 # endif
2987 }
2988
2989 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2990 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2991 MODULE_LICENSE("GPL");
2992
2993 cfs_module(obdecho, LUSTRE_VERSION_STRING, obdecho_init, obdecho_exit);
2994
2995 /** @} echo_client */