Whamcloud - gitweb
LU-12578 obdecho: reuse an cl env cache for obdecho survey
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_ECHO
34
35 #include <linux/user_namespace.h>
36 #ifdef HAVE_UIDGID_HEADER
37 # include <linux/uidgid.h>
38 #endif
39 #include <libcfs/libcfs.h>
40
41 #include <obd.h>
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <lustre_debug.h>
45 #include <lprocfs_status.h>
46 #include <cl_object.h>
47 #include <lustre_fid.h>
48 #include <lustre_lmv.h>
49 #include <lustre_acl.h>
50 #include <uapi/linux/lustre/lustre_ioctl.h>
51 #include <lustre_net.h>
52 #ifdef HAVE_SERVER_SUPPORT
53 # include <md_object.h>
54
55 #define ETI_NAME_LEN    20
56
57 #endif /* HAVE_SERVER_SUPPORT */
58
59 #include "echo_internal.h"
60
61 /** \defgroup echo_client Echo Client
62  * @{
63  */
64
65 /* echo thread key have a CL_THREAD flag, which set cl_env function directly */
66 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
67 #define ECHO_DT_CTX_TAG (LCT_REMEMBER | LCT_DT_THREAD)
68 #define ECHO_SES_TAG    (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
69
70 struct echo_device {
71         struct cl_device          ed_cl;
72         struct echo_client_obd   *ed_ec;
73
74         struct cl_site            ed_site_myself;
75         struct lu_site           *ed_site;
76         struct lu_device         *ed_next;
77         int                       ed_next_ismd;
78         struct lu_client_seq     *ed_cl_seq;
79 #ifdef HAVE_SERVER_SUPPORT
80         struct local_oid_storage *ed_los;
81         struct lu_fid             ed_root_fid;
82 #endif /* HAVE_SERVER_SUPPORT */
83 };
84
85 struct echo_object {
86         struct cl_object        eo_cl;
87         struct cl_object_header eo_hdr;
88         struct echo_device     *eo_dev;
89         struct list_head        eo_obj_chain;
90         struct lov_oinfo       *eo_oinfo;
91         atomic_t                eo_npages;
92         int                     eo_deleted;
93 };
94
95 struct echo_object_conf {
96         struct cl_object_conf   eoc_cl;
97         struct lov_oinfo      **eoc_oinfo;
98 };
99
100 struct echo_page {
101         struct cl_page_slice    ep_cl;
102         unsigned long           ep_lock;
103 };
104
105 struct echo_lock {
106         struct cl_lock_slice    el_cl;
107         struct list_head        el_chain;
108         struct echo_object     *el_object;
109         __u64                   el_cookie;
110         atomic_t                el_refcount;
111 };
112
113 #ifdef HAVE_SERVER_SUPPORT
114 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
115
116 /**
117  * In order to use the values of members in struct mdd_device,
118  * we define an alias structure here.
119  */
120 struct echo_md_device {
121         struct md_device                 emd_md_dev;
122         struct obd_export               *emd_child_exp;
123         struct dt_device                *emd_child;
124         struct dt_device                *emd_bottom;
125         struct lu_fid                    emd_root_fid;
126         struct lu_fid                    emd_local_root_fid;
127 };
128 #endif /* HAVE_SERVER_SUPPORT */
129
130 static int echo_client_setup(const struct lu_env *env,
131                              struct obd_device *obddev,
132                              struct lustre_cfg *lcfg);
133 static int echo_client_cleanup(struct obd_device *obddev);
134
135 /** \defgroup echo_helpers Helper functions
136  * @{
137  */
138 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
139 {
140         return container_of0(dev, struct echo_device, ed_cl);
141 }
142
143 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
144 {
145         return &d->ed_cl;
146 }
147
148 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
149 {
150         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
151 }
152
153 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
154 {
155         return &eco->eo_cl;
156 }
157
158 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
159 {
160         return container_of(o, struct echo_object, eo_cl);
161 }
162
163 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
164 {
165         return container_of(s, struct echo_page, ep_cl);
166 }
167
168 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
169 {
170         return container_of(s, struct echo_lock, el_cl);
171 }
172
173 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
174 {
175         return ecl->el_cl.cls_lock;
176 }
177
178 static struct lu_context_key echo_thread_key;
179
180 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
181 {
182         struct echo_thread_info *info;
183
184         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
185         LASSERT(info != NULL);
186         return info;
187 }
188
189 static inline
190 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
191 {
192         return container_of(c, struct echo_object_conf, eoc_cl);
193 }
194
195 #ifdef HAVE_SERVER_SUPPORT
196 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
197 {
198         return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
199 }
200
201 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
202 {
203         return &d->emd_md_dev.md_lu_dev;
204 }
205
206 static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
207 {
208         return emd2lu_dev(d)->ld_site->ld_seq_site;
209 }
210
211 static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
212 {
213         return d->emd_md_dev.md_lu_dev.ld_obd;
214 }
215 #endif /* HAVE_SERVER_SUPPORT */
216
217 /** @} echo_helpers */
218
219 static int cl_echo_object_put(struct echo_object *eco);
220 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
221                               struct page **pages, int npages, int async);
222
223 struct echo_thread_info {
224         struct echo_object_conf eti_conf;
225         struct lustre_md        eti_md;
226         struct cl_2queue        eti_queue;
227         struct cl_io            eti_io;
228         struct cl_lock          eti_lock;
229         struct lu_fid           eti_fid;
230         struct lu_fid           eti_fid2;
231 #ifdef HAVE_SERVER_SUPPORT
232         struct md_op_spec       eti_spec;
233         struct lov_mds_md_v3    eti_lmm;
234         struct lov_user_md_v3   eti_lum;
235         struct md_attr          eti_ma;
236         struct lu_name          eti_lname;
237         /* per-thread values, can be re-used */
238         void                    *eti_big_lmm; /* may be vmalloc'd */
239         int                     eti_big_lmmsize;
240         char                    eti_name[ETI_NAME_LEN];
241         struct lu_buf           eti_buf;
242         /* If we want to test large ACL, then need to enlarge the buffer. */
243         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE_OLD];
244 #endif
245 };
246
247 /* No session used right now */
248 struct echo_session_info {
249         unsigned long dummy;
250 };
251
252 static struct kmem_cache *echo_lock_kmem;
253 static struct kmem_cache *echo_object_kmem;
254 static struct kmem_cache *echo_thread_kmem;
255 static struct kmem_cache *echo_session_kmem;
256 /* static struct kmem_cache *echo_req_kmem; */
257
258 static struct lu_kmem_descr echo_caches[] = {
259         {
260                 .ckd_cache = &echo_lock_kmem,
261                 .ckd_name  = "echo_lock_kmem",
262                 .ckd_size  = sizeof(struct echo_lock)
263         },
264         {
265                 .ckd_cache = &echo_object_kmem,
266                 .ckd_name  = "echo_object_kmem",
267                 .ckd_size  = sizeof(struct echo_object)
268         },
269         {
270                 .ckd_cache = &echo_thread_kmem,
271                 .ckd_name  = "echo_thread_kmem",
272                 .ckd_size  = sizeof(struct echo_thread_info)
273         },
274         {
275                 .ckd_cache = &echo_session_kmem,
276                 .ckd_name  = "echo_session_kmem",
277                 .ckd_size  = sizeof(struct echo_session_info)
278         },
279         {
280                 .ckd_cache = NULL
281         }
282 };
283
284 /** \defgroup echo_page Page operations
285  *
286  * Echo page operations.
287  *
288  * @{
289  */
290 static int echo_page_own(const struct lu_env *env,
291                          const struct cl_page_slice *slice,
292                          struct cl_io *io, int nonblock)
293 {
294         struct echo_page *ep = cl2echo_page(slice);
295
296         if (!nonblock) {
297                 if (test_and_set_bit(0, &ep->ep_lock))
298                         return -EAGAIN;
299         } else {
300                 while (test_and_set_bit(0, &ep->ep_lock))
301                         wait_on_bit(&ep->ep_lock, 0, TASK_UNINTERRUPTIBLE);
302         }
303         return 0;
304 }
305
306 static void echo_page_disown(const struct lu_env *env,
307                              const struct cl_page_slice *slice,
308                              struct cl_io *io)
309 {
310         struct echo_page *ep = cl2echo_page(slice);
311
312         LASSERT(test_bit(0, &ep->ep_lock));
313         clear_and_wake_up_bit(0, &ep->ep_lock);
314 }
315
316 static void echo_page_discard(const struct lu_env *env,
317                               const struct cl_page_slice *slice,
318                               struct cl_io *unused)
319 {
320         cl_page_delete(env, slice->cpl_page);
321 }
322
323 static int echo_page_is_vmlocked(const struct lu_env *env,
324                                  const struct cl_page_slice *slice)
325 {
326         if (test_bit(0, &cl2echo_page(slice)->ep_lock))
327                 return -EBUSY;
328         return -ENODATA;
329 }
330
331 static void echo_page_completion(const struct lu_env *env,
332                                  const struct cl_page_slice *slice,
333                                  int ioret)
334 {
335         LASSERT(slice->cpl_page->cp_sync_io != NULL);
336 }
337
338 static void echo_page_fini(const struct lu_env *env,
339                            struct cl_page_slice *slice,
340                            struct pagevec *pvec)
341 {
342         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
343
344         ENTRY;
345         atomic_dec(&eco->eo_npages);
346         put_page(slice->cpl_page->cp_vmpage);
347         EXIT;
348 }
349
350 static int echo_page_prep(const struct lu_env *env,
351                           const struct cl_page_slice *slice,
352                           struct cl_io *unused)
353 {
354         return 0;
355 }
356
357 static int echo_page_print(const struct lu_env *env,
358                            const struct cl_page_slice *slice,
359                            void *cookie, lu_printer_t printer)
360 {
361         struct echo_page *ep = cl2echo_page(slice);
362
363         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
364                    ep, test_bit(0, &ep->ep_lock),
365                    slice->cpl_page->cp_vmpage);
366         return 0;
367 }
368
369 static const struct cl_page_operations echo_page_ops = {
370         .cpo_own           = echo_page_own,
371         .cpo_disown        = echo_page_disown,
372         .cpo_discard       = echo_page_discard,
373         .cpo_fini          = echo_page_fini,
374         .cpo_print         = echo_page_print,
375         .cpo_is_vmlocked   = echo_page_is_vmlocked,
376         .io = {
377                 [CRT_READ] = {
378                         .cpo_prep        = echo_page_prep,
379                         .cpo_completion  = echo_page_completion,
380                 },
381                 [CRT_WRITE] = {
382                         .cpo_prep        = echo_page_prep,
383                         .cpo_completion  = echo_page_completion,
384                 }
385         }
386 };
387
388 /** @} echo_page */
389
390 /** \defgroup echo_lock Locking
391  *
392  * echo lock operations
393  *
394  * @{
395  */
396 static void echo_lock_fini(const struct lu_env *env,
397                            struct cl_lock_slice *slice)
398 {
399         struct echo_lock *ecl = cl2echo_lock(slice);
400
401         LASSERT(list_empty(&ecl->el_chain));
402         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
403 }
404
405 static struct cl_lock_operations echo_lock_ops = {
406         .clo_fini      = echo_lock_fini,
407 };
408
409 /** @} echo_lock */
410
411 /** \defgroup echo_cl_ops cl_object operations
412  *
413  * operations for cl_object
414  *
415  * @{
416  */
417 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
418                           struct cl_page *page, pgoff_t index)
419 {
420         struct echo_page *ep = cl_object_page_slice(obj, page);
421         struct echo_object *eco = cl2echo_obj(obj);
422
423         ENTRY;
424         get_page(page->cp_vmpage);
425         /*
426          * ep_lock is similar to the lock_page() lock, and
427          * cannot usefully be monitored by lockdep.
428          * So just use a bit in an "unsigned long" and use the
429          * wait_on_bit() interface to wait for the bit to be clear.
430          */
431         ep->ep_lock = 0;
432         cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
433         atomic_inc(&eco->eo_npages);
434         RETURN(0);
435 }
436
437 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
438                         struct cl_io *io)
439 {
440         return 0;
441 }
442
443 static int echo_lock_init(const struct lu_env *env,
444                           struct cl_object *obj, struct cl_lock *lock,
445                           const struct cl_io *unused)
446 {
447         struct echo_lock *el;
448
449         ENTRY;
450         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
451         if (el) {
452                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
453                 el->el_object = cl2echo_obj(obj);
454                 INIT_LIST_HEAD(&el->el_chain);
455                 atomic_set(&el->el_refcount, 0);
456         }
457         RETURN(el ? 0 : -ENOMEM);
458 }
459
460 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
461                          const struct cl_object_conf *conf)
462 {
463         return 0;
464 }
465
466 static const struct cl_object_operations echo_cl_obj_ops = {
467         .coo_page_init = echo_page_init,
468         .coo_lock_init = echo_lock_init,
469         .coo_io_init   = echo_io_init,
470         .coo_conf_set  = echo_conf_set
471 };
472 /** @} echo_cl_ops */
473
474 /** \defgroup echo_lu_ops lu_object operations
475  *
476  * operations for echo lu object.
477  *
478  * @{
479  */
480 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
481                             const struct lu_object_conf *conf)
482 {
483         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
484         struct echo_client_obd *ec     = ed->ed_ec;
485         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
486
487         ENTRY;
488         if (ed->ed_next) {
489                 struct lu_object  *below;
490                 struct lu_device  *under;
491
492                 under = ed->ed_next;
493                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
494                                                         under);
495                 if (!below)
496                         RETURN(-ENOMEM);
497                 lu_object_add(obj, below);
498         }
499
500         if (!ed->ed_next_ismd) {
501                 const struct cl_object_conf *cconf = lu2cl_conf(conf);
502                 struct echo_object_conf *econf = cl2echo_conf(cconf);
503
504                 LASSERT(econf->eoc_oinfo != NULL);
505
506                 /*
507                  * Transfer the oinfo pointer to eco that it won't be
508                  * freed.
509                  */
510                 eco->eo_oinfo = *econf->eoc_oinfo;
511                 *econf->eoc_oinfo = NULL;
512         } else {
513                 eco->eo_oinfo = NULL;
514         }
515
516         eco->eo_dev = ed;
517         atomic_set(&eco->eo_npages, 0);
518         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
519
520         spin_lock(&ec->ec_lock);
521         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
522         spin_unlock(&ec->ec_lock);
523
524         RETURN(0);
525 }
526
527 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
528 {
529         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
530         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
531
532         ENTRY;
533         LASSERT(atomic_read(&eco->eo_npages) == 0);
534
535         spin_lock(&ec->ec_lock);
536         list_del_init(&eco->eo_obj_chain);
537         spin_unlock(&ec->ec_lock);
538
539         lu_object_fini(obj);
540         lu_object_header_fini(obj->lo_header);
541
542         if (eco->eo_oinfo)
543                 OBD_FREE_PTR(eco->eo_oinfo);
544
545         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
546         EXIT;
547 }
548
549 static int echo_object_print(const struct lu_env *env, void *cookie,
550                              lu_printer_t p, const struct lu_object *o)
551 {
552         struct echo_object *obj = cl2echo_obj(lu2cl(o));
553
554         return (*p)(env, cookie, "echoclient-object@%p", obj);
555 }
556
557 static const struct lu_object_operations echo_lu_obj_ops = {
558         .loo_object_init      = echo_object_init,
559         .loo_object_delete    = NULL,
560         .loo_object_release   = NULL,
561         .loo_object_free      = echo_object_free,
562         .loo_object_print     = echo_object_print,
563         .loo_object_invariant = NULL
564 };
565 /** @} echo_lu_ops */
566
567 /** \defgroup echo_lu_dev_ops  lu_device operations
568  *
569  * Operations for echo lu device.
570  *
571  * @{
572  */
573 static struct lu_object *echo_object_alloc(const struct lu_env *env,
574                                            const struct lu_object_header *hdr,
575                                            struct lu_device *dev)
576 {
577         struct echo_object *eco;
578         struct lu_object *obj = NULL;
579
580         ENTRY;
581         /* we're the top dev. */
582         LASSERT(hdr == NULL);
583         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
584         if (eco) {
585                 struct cl_object_header *hdr = &eco->eo_hdr;
586
587                 obj = &echo_obj2cl(eco)->co_lu;
588                 cl_object_header_init(hdr);
589                 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
590
591                 lu_object_init(obj, &hdr->coh_lu, dev);
592                 lu_object_add_top(&hdr->coh_lu, obj);
593
594                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
595                 obj->lo_ops       = &echo_lu_obj_ops;
596         }
597         RETURN(obj);
598 }
599
600 static struct lu_device_operations echo_device_lu_ops = {
601         .ldo_object_alloc   = echo_object_alloc,
602 };
603
604 /** @} echo_lu_dev_ops */
605
606 /** \defgroup echo_init Setup and teardown
607  *
608  * Init and fini functions for echo client.
609  *
610  * @{
611  */
612 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
613 {
614         struct cl_site *site = &ed->ed_site_myself;
615         int rc;
616
617         /* initialize site */
618         rc = cl_site_init(site, &ed->ed_cl);
619         if (rc) {
620                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
621                 return rc;
622         }
623
624         rc = lu_site_init_finish(&site->cs_lu);
625         if (rc) {
626                 cl_site_fini(site);
627                 return rc;
628         }
629
630         ed->ed_site = &site->cs_lu;
631         return 0;
632 }
633
634 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
635 {
636         if (ed->ed_site) {
637                 if (!ed->ed_next_ismd)
638                         lu_site_fini(ed->ed_site);
639                 ed->ed_site = NULL;
640         }
641 }
642
643 static void *echo_thread_key_init(const struct lu_context *ctx,
644                                   struct lu_context_key *key)
645 {
646         struct echo_thread_info *info;
647
648         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
649         if (!info)
650                 info = ERR_PTR(-ENOMEM);
651         return info;
652 }
653
654 static void echo_thread_key_fini(const struct lu_context *ctx,
655                                  struct lu_context_key *key, void *data)
656 {
657         struct echo_thread_info *info = data;
658
659         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
660 }
661
662 static struct lu_context_key echo_thread_key = {
663         .lct_tags = LCT_CL_THREAD,
664         .lct_init = echo_thread_key_init,
665         .lct_fini = echo_thread_key_fini,
666 };
667
668 static void *echo_session_key_init(const struct lu_context *ctx,
669                                   struct lu_context_key *key)
670 {
671         struct echo_session_info *session;
672
673         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
674         if (!session)
675                 session = ERR_PTR(-ENOMEM);
676         return session;
677 }
678
679 static void echo_session_key_fini(const struct lu_context *ctx,
680                                   struct lu_context_key *key, void *data)
681 {
682         struct echo_session_info *session = data;
683
684         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
685 }
686
687 static struct lu_context_key echo_session_key = {
688         .lct_tags = LCT_SESSION,
689         .lct_init = echo_session_key_init,
690         .lct_fini = echo_session_key_fini,
691 };
692
693 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
694
695 #ifdef HAVE_SERVER_SUPPORT
696 # define ECHO_SEQ_WIDTH 0xffffffff
697 static int echo_fid_init(struct echo_device *ed, char *obd_name,
698                          struct seq_server_site *ss)
699 {
700         char *prefix;
701         int rc;
702
703         ENTRY;
704         OBD_ALLOC_PTR(ed->ed_cl_seq);
705         if (!ed->ed_cl_seq)
706                 RETURN(-ENOMEM);
707
708         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
709         if (!prefix)
710                 GOTO(out_free_seq, rc = -ENOMEM);
711
712         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
713
714         /* Init client side sequence-manager */
715         rc = seq_client_init(ed->ed_cl_seq, NULL,
716                              LUSTRE_SEQ_METADATA,
717                              prefix, ss->ss_server_seq);
718         ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
719         OBD_FREE(prefix, MAX_OBD_NAME + 5);
720         if (rc)
721                 GOTO(out_free_seq, rc);
722
723         RETURN(0);
724
725 out_free_seq:
726         OBD_FREE_PTR(ed->ed_cl_seq);
727         ed->ed_cl_seq = NULL;
728         RETURN(rc);
729 }
730
731 static int echo_fid_fini(struct obd_device *obddev)
732 {
733         struct echo_device *ed = obd2echo_dev(obddev);
734
735         ENTRY;
736         if (ed->ed_cl_seq) {
737                 seq_client_fini(ed->ed_cl_seq);
738                 OBD_FREE_PTR(ed->ed_cl_seq);
739                 ed->ed_cl_seq = NULL;
740         }
741
742         RETURN(0);
743 }
744
745 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
746 {
747         ENTRY;
748         if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
749                 local_oid_storage_fini(env, ed->ed_los);
750                 ed->ed_los = NULL;
751         }
752 }
753
754 static int
755 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
756                           struct local_oid_storage *los,
757                           const struct lu_fid *pfid, const char *name,
758                           __u32 mode, struct lu_fid *fid)
759 {
760         struct dt_object        *parent = NULL;
761         struct dt_object        *dto = NULL;
762         int                      rc = 0;
763
764         ENTRY;
765         LASSERT(!fid_is_zero(pfid));
766         parent = dt_locate(env, emd->emd_bottom, pfid);
767         if (unlikely(IS_ERR(parent)))
768                 RETURN(PTR_ERR(parent));
769
770         /* create local file with @fid */
771         dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
772                                                  parent, name, mode);
773         if (IS_ERR(dto))
774                 GOTO(out_put, rc = PTR_ERR(dto));
775
776         *fid = *lu_object_fid(&dto->do_lu);
777         /*
778          * since stack is not fully set up the local_storage uses own stack
779          * and we should drop its object from cache
780          */
781         dt_object_put_nocache(env, dto);
782
783         EXIT;
784 out_put:
785         dt_object_put(env, parent);
786         RETURN(rc);
787 }
788
789 static int
790 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
791                  struct echo_device *ed)
792 {
793         struct lu_fid fid;
794         int rc = 0;
795
796         ENTRY;
797         /* Setup local dirs */
798         fid.f_seq = FID_SEQ_LOCAL_NAME;
799         fid.f_oid = 1;
800         fid.f_ver = 0;
801         rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
802         if (rc != 0)
803                 RETURN(rc);
804
805         lu_echo_root_fid(&fid);
806         if (echo_md_seq_site(emd)->ss_node_id == 0) {
807                 rc = echo_md_local_file_create(env, emd, ed->ed_los,
808                                                &emd->emd_local_root_fid,
809                                                echo_md_root_dir_name, S_IFDIR |
810                                                S_IRUGO | S_IWUSR | S_IXUGO,
811                                                &fid);
812                 if (rc != 0) {
813                         CERROR("%s: create md echo root fid failed: rc = %d\n",
814                                emd2obd_dev(emd)->obd_name, rc);
815                         GOTO(out_los, rc);
816                 }
817         }
818         ed->ed_root_fid = fid;
819
820         RETURN(0);
821 out_los:
822         echo_ed_los_fini(env, ed);
823
824         RETURN(rc);
825 }
826 #endif /* HAVE_SERVER_SUPPORT */
827
828 static struct lu_device *echo_device_alloc(const struct lu_env *env,
829                                            struct lu_device_type *t,
830                                            struct lustre_cfg *cfg)
831 {
832         struct lu_device   *next;
833         struct echo_device *ed;
834         struct cl_device   *cd;
835         struct obd_device  *obd = NULL; /* to keep compiler happy */
836         struct obd_device  *tgt;
837         const char *tgt_type_name;
838         int rc;
839         int cleanup = 0;
840
841         ENTRY;
842         OBD_ALLOC_PTR(ed);
843         if (!ed)
844                 GOTO(out, rc = -ENOMEM);
845
846         cleanup = 1;
847         cd = &ed->ed_cl;
848         rc = cl_device_init(cd, t);
849         if (rc)
850                 GOTO(out, rc);
851
852         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
853
854         cleanup = 2;
855         obd = class_name2obd(lustre_cfg_string(cfg, 0));
856         LASSERT(obd != NULL);
857         LASSERT(env != NULL);
858
859         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
860         if (!tgt) {
861                 CERROR("Can not find tgt device %s\n",
862                         lustre_cfg_string(cfg, 1));
863                 GOTO(out, rc = -ENODEV);
864         }
865
866         next = tgt->obd_lu_dev;
867
868         if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
869                 ed->ed_next_ismd = 1;
870         } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
871                    strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
872                 ed->ed_next_ismd = 0;
873                 rc = echo_site_init(env, ed);
874                 if (rc)
875                         GOTO(out, rc);
876         } else {
877                 GOTO(out, rc = -EINVAL);
878         }
879
880         cleanup = 3;
881
882         rc = echo_client_setup(env, obd, cfg);
883         if (rc)
884                 GOTO(out, rc);
885
886         ed->ed_ec = &obd->u.echo_client;
887         cleanup = 4;
888
889         if (ed->ed_next_ismd) {
890 #ifdef HAVE_SERVER_SUPPORT
891                 /* Suppose to connect to some Metadata layer */
892                 struct lu_site          *ls = NULL;
893                 struct lu_device        *ld = NULL;
894                 struct md_device        *md = NULL;
895                 struct echo_md_device   *emd = NULL;
896                 int                      found = 0;
897
898                 if (!next) {
899                         CERROR("%s is not lu device type!\n",
900                                lustre_cfg_string(cfg, 1));
901                         GOTO(out, rc = -EINVAL);
902                 }
903
904                 tgt_type_name = lustre_cfg_string(cfg, 2);
905                 if (!tgt_type_name) {
906                         CERROR("%s no type name for echo %s setup\n",
907                                 lustre_cfg_string(cfg, 1),
908                                 tgt->obd_type->typ_name);
909                         GOTO(out, rc = -EINVAL);
910                 }
911
912                 ls = next->ld_site;
913
914                 spin_lock(&ls->ls_ld_lock);
915                 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
916                         if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
917                                 found = 1;
918                                 break;
919                         }
920                 }
921                 spin_unlock(&ls->ls_ld_lock);
922
923                 if (found == 0) {
924                         CERROR("%s is not lu device type!\n",
925                                lustre_cfg_string(cfg, 1));
926                         GOTO(out, rc = -EINVAL);
927                 }
928
929                 next = ld;
930                 /* For MD echo client, it will use the site in MDS stack */
931                 ed->ed_site = ls;
932                 ed->ed_cl.cd_lu_dev.ld_site = ls;
933                 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
934                 if (rc) {
935                         CERROR("echo fid init error %d\n", rc);
936                         GOTO(out, rc);
937                 }
938
939                 md = lu2md_dev(next);
940                 emd = lu2emd_dev(&md->md_lu_dev);
941                 rc = echo_md_root_get(env, emd, ed);
942                 if (rc != 0) {
943                         CERROR("%s: get root error: rc = %d\n",
944                                 emd2obd_dev(emd)->obd_name, rc);
945                         GOTO(out, rc);
946                 }
947 #else /* !HAVE_SERVER_SUPPORT */
948                 CERROR(
949                        "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
950                 GOTO(out, rc = -EOPNOTSUPP);
951 #endif /* HAVE_SERVER_SUPPORT */
952         } else {
953                 /*
954                  * if echo client is to be stacked upon ost device, the next is
955                  * NULL since ost is not a clio device so far
956                  */
957                 if (next != NULL && !lu_device_is_cl(next))
958                         next = NULL;
959
960                 tgt_type_name = tgt->obd_type->typ_name;
961                 if (next) {
962                         LASSERT(next != NULL);
963                         if (next->ld_site)
964                                 GOTO(out, rc = -EBUSY);
965
966                         next->ld_site = ed->ed_site;
967                         rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
968                                                         next->ld_type->ldt_name,
969                                                         NULL);
970                         if (rc)
971                                 GOTO(out, rc);
972                 } else {
973                         LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
974                 }
975         }
976
977         ed->ed_next = next;
978         RETURN(&cd->cd_lu_dev);
979 out:
980         switch (cleanup) {
981         case 4: {
982                 int rc2;
983
984                 rc2 = echo_client_cleanup(obd);
985                 if (rc2)
986                         CERROR("Cleanup obd device %s error(%d)\n",
987                                obd->obd_name, rc2);
988         }
989
990         case 3:
991                 echo_site_fini(env, ed);
992         case 2:
993                 cl_device_fini(&ed->ed_cl);
994         case 1:
995                 OBD_FREE_PTR(ed);
996         case 0:
997         default:
998                 break;
999         }
1000         return ERR_PTR(rc);
1001 }
1002
1003 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
1004                             const char *name, struct lu_device *next)
1005 {
1006         LBUG();
1007         return 0;
1008 }
1009
1010 static struct lu_device *echo_device_fini(const struct lu_env *env,
1011                                           struct lu_device *d)
1012 {
1013         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
1014         struct lu_device *next = ed->ed_next;
1015
1016         while (next && !ed->ed_next_ismd)
1017                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
1018         return NULL;
1019 }
1020
1021 static void echo_lock_release(const struct lu_env *env,
1022                               struct echo_lock *ecl,
1023                               int still_used)
1024 {
1025         struct cl_lock *clk = echo_lock2cl(ecl);
1026
1027         cl_lock_release(env, clk);
1028 }
1029
1030 static struct lu_device *echo_device_free(const struct lu_env *env,
1031                                           struct lu_device *d)
1032 {
1033         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
1034         struct echo_client_obd *ec   = ed->ed_ec;
1035         struct echo_object     *eco;
1036         struct lu_device       *next = ed->ed_next;
1037
1038         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
1039                ed, next);
1040
1041         lu_site_purge(env, ed->ed_site, -1);
1042
1043         /*
1044          * check if there are objects still alive.
1045          * It shouldn't have any object because lu_site_purge would cleanup
1046          * all of cached objects. Anyway, probably the echo device is being
1047          * parallelly accessed.
1048          */
1049         spin_lock(&ec->ec_lock);
1050         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
1051                 eco->eo_deleted = 1;
1052         spin_unlock(&ec->ec_lock);
1053
1054         /* purge again */
1055         lu_site_purge(env, ed->ed_site, -1);
1056
1057         CDEBUG(D_INFO,
1058                "Waiting for the reference of echo object to be dropped\n");
1059
1060         /* Wait for the last reference to be dropped. */
1061         spin_lock(&ec->ec_lock);
1062         while (!list_empty(&ec->ec_objects)) {
1063                 spin_unlock(&ec->ec_lock);
1064                 CERROR(
1065                        "echo_client still has objects at cleanup time, wait for 1 second\n");
1066                 set_current_state(TASK_UNINTERRUPTIBLE);
1067                 schedule_timeout(cfs_time_seconds(1));
1068                 lu_site_purge(env, ed->ed_site, -1);
1069                 spin_lock(&ec->ec_lock);
1070         }
1071         spin_unlock(&ec->ec_lock);
1072
1073         LASSERT(list_empty(&ec->ec_locks));
1074
1075         CDEBUG(D_INFO, "No object exists, exiting...\n");
1076
1077         echo_client_cleanup(d->ld_obd);
1078 #ifdef HAVE_SERVER_SUPPORT
1079         echo_fid_fini(d->ld_obd);
1080         echo_ed_los_fini(env, ed);
1081 #endif
1082         while (next && !ed->ed_next_ismd)
1083                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1084
1085         LASSERT(ed->ed_site == d->ld_site);
1086         echo_site_fini(env, ed);
1087         cl_device_fini(&ed->ed_cl);
1088         OBD_FREE_PTR(ed);
1089
1090         cl_env_cache_purge(~0);
1091
1092         return NULL;
1093 }
1094
1095 static const struct lu_device_type_operations echo_device_type_ops = {
1096         .ldto_init = echo_type_init,
1097         .ldto_fini = echo_type_fini,
1098
1099         .ldto_start = echo_type_start,
1100         .ldto_stop  = echo_type_stop,
1101
1102         .ldto_device_alloc = echo_device_alloc,
1103         .ldto_device_free  = echo_device_free,
1104         .ldto_device_init  = echo_device_init,
1105         .ldto_device_fini  = echo_device_fini
1106 };
1107
1108 static struct lu_device_type echo_device_type = {
1109         .ldt_tags     = LU_DEVICE_CL,
1110         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
1111         .ldt_ops      = &echo_device_type_ops,
1112         .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1113 };
1114 /** @} echo_init */
1115
1116 /** \defgroup echo_exports Exported operations
1117  *
1118  * exporting functions to echo client
1119  *
1120  * @{
1121  */
1122
1123 /* Interfaces to echo client obd device */
1124 static struct echo_object *
1125 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
1126 {
1127         struct lu_env *env;
1128         struct echo_thread_info *info;
1129         struct echo_object_conf *conf;
1130         struct echo_object *eco;
1131         struct cl_object *obj;
1132         struct lov_oinfo *oinfo = NULL;
1133         struct lu_fid *fid;
1134         __u16  refcheck;
1135         int rc;
1136
1137         ENTRY;
1138         LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
1139         LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
1140
1141         /* Never return an object if the obd is to be freed. */
1142         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1143                 RETURN(ERR_PTR(-ENODEV));
1144
1145         env = cl_env_get(&refcheck);
1146         if (IS_ERR(env))
1147                 RETURN((void *)env);
1148
1149         info = echo_env_info(env);
1150         conf = &info->eti_conf;
1151         if (d->ed_next) {
1152                 OBD_ALLOC_PTR(oinfo);
1153                 if (!oinfo)
1154                         GOTO(out, eco = ERR_PTR(-ENOMEM));
1155
1156                 oinfo->loi_oi = *oi;
1157                 conf->eoc_cl.u.coc_oinfo = oinfo;
1158         }
1159
1160         /*
1161          * If echo_object_init() is successful then ownership of oinfo
1162          * is transferred to the object.
1163          */
1164         conf->eoc_oinfo = &oinfo;
1165
1166         fid = &info->eti_fid;
1167         rc = ostid_to_fid(fid, oi, 0);
1168         if (rc != 0)
1169                 GOTO(out, eco = ERR_PTR(rc));
1170
1171         /*
1172          * In the function below, .hs_keycmp resolves to
1173          * lu_obj_hop_keycmp()
1174          */
1175         /* coverity[overrun-buffer-val] */
1176         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1177         if (IS_ERR(obj))
1178                 GOTO(out, eco = (void *)obj);
1179
1180         eco = cl2echo_obj(obj);
1181         if (eco->eo_deleted) {
1182                 cl_object_put(env, obj);
1183                 eco = ERR_PTR(-EAGAIN);
1184         }
1185
1186 out:
1187         if (oinfo)
1188                 OBD_FREE_PTR(oinfo);
1189
1190         cl_env_put(env, &refcheck);
1191         RETURN(eco);
1192 }
1193
1194 static int cl_echo_object_put(struct echo_object *eco)
1195 {
1196         struct lu_env *env;
1197         struct cl_object *obj = echo_obj2cl(eco);
1198         __u16  refcheck;
1199
1200         ENTRY;
1201         env = cl_env_get(&refcheck);
1202         if (IS_ERR(env))
1203                 RETURN(PTR_ERR(env));
1204
1205         /* an external function to kill an object? */
1206         if (eco->eo_deleted) {
1207                 struct lu_object_header *loh = obj->co_lu.lo_header;
1208
1209                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1210                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1211         }
1212
1213         cl_object_put(env, obj);
1214         cl_env_put(env, &refcheck);
1215         RETURN(0);
1216 }
1217
1218 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1219                             u64 start, u64 end, int mode,
1220                             __u64 *cookie, __u32 enqflags)
1221 {
1222         struct cl_io *io;
1223         struct cl_lock *lck;
1224         struct cl_object *obj;
1225         struct cl_lock_descr *descr;
1226         struct echo_thread_info *info;
1227         int rc = -ENOMEM;
1228
1229         ENTRY;
1230         info = echo_env_info(env);
1231         io = &info->eti_io;
1232         lck = &info->eti_lock;
1233         obj = echo_obj2cl(eco);
1234
1235         memset(lck, 0, sizeof(*lck));
1236         descr = &lck->cll_descr;
1237         descr->cld_obj   = obj;
1238         descr->cld_start = cl_index(obj, start);
1239         descr->cld_end   = cl_index(obj, end);
1240         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1241         descr->cld_enq_flags = enqflags;
1242         io->ci_obj = obj;
1243
1244         rc = cl_lock_request(env, io, lck);
1245         if (rc == 0) {
1246                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1247                 struct echo_lock *el;
1248
1249                 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1250                 spin_lock(&ec->ec_lock);
1251                 if (list_empty(&el->el_chain)) {
1252                         list_add(&el->el_chain, &ec->ec_locks);
1253                         el->el_cookie = ++ec->ec_unique;
1254                 }
1255                 atomic_inc(&el->el_refcount);
1256                 *cookie = el->el_cookie;
1257                 spin_unlock(&ec->ec_lock);
1258         }
1259         RETURN(rc);
1260 }
1261
1262 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1263                            __u64 cookie)
1264 {
1265         struct echo_client_obd *ec = ed->ed_ec;
1266         struct echo_lock *ecl = NULL;
1267         struct list_head *el;
1268         int found = 0, still_used = 0;
1269
1270         ENTRY;
1271         LASSERT(ec != NULL);
1272         spin_lock(&ec->ec_lock);
1273         list_for_each(el, &ec->ec_locks) {
1274                 ecl = list_entry(el, struct echo_lock, el_chain);
1275                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1276                 found = (ecl->el_cookie == cookie);
1277                 if (found) {
1278                         if (atomic_dec_and_test(&ecl->el_refcount))
1279                                 list_del_init(&ecl->el_chain);
1280                         else
1281                                 still_used = 1;
1282                         break;
1283                 }
1284         }
1285         spin_unlock(&ec->ec_lock);
1286
1287         if (!found)
1288                 RETURN(-ENOENT);
1289
1290         echo_lock_release(env, ecl, still_used);
1291         RETURN(0);
1292 }
1293
1294 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1295                                  struct cl_page *page)
1296 {
1297         struct echo_thread_info *info;
1298         struct cl_2queue        *queue;
1299
1300         info = echo_env_info(env);
1301         LASSERT(io == &info->eti_io);
1302
1303         queue = &info->eti_queue;
1304         cl_page_list_add(&queue->c2_qout, page);
1305 }
1306
1307 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1308                               struct page **pages, int npages, int async)
1309 {
1310         struct lu_env           *env;
1311         struct echo_thread_info *info;
1312         struct cl_object        *obj = echo_obj2cl(eco);
1313         struct echo_device      *ed  = eco->eo_dev;
1314         struct cl_2queue        *queue;
1315         struct cl_io            *io;
1316         struct cl_page          *clp;
1317         struct lustre_handle    lh = { 0 };
1318         int page_size = cl_page_size(obj);
1319         int rc;
1320         int i;
1321         __u16 refcheck;
1322
1323         ENTRY;
1324         LASSERT((offset & ~PAGE_MASK) == 0);
1325         LASSERT(ed->ed_next != NULL);
1326         env = cl_env_get(&refcheck);
1327         if (IS_ERR(env))
1328                 RETURN(PTR_ERR(env));
1329
1330         info    = echo_env_info(env);
1331         io      = &info->eti_io;
1332         queue   = &info->eti_queue;
1333
1334         cl_2queue_init(queue);
1335
1336         io->ci_ignore_layout = 1;
1337         rc = cl_io_init(env, io, CIT_MISC, obj);
1338         if (rc < 0)
1339                 GOTO(out, rc);
1340         LASSERT(rc == 0);
1341
1342         rc = cl_echo_enqueue0(env, eco, offset,
1343                               offset + npages * PAGE_SIZE - 1,
1344                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1345                               CEF_NEVER);
1346         if (rc < 0)
1347                 GOTO(error_lock, rc);
1348
1349         for (i = 0; i < npages; i++) {
1350                 LASSERT(pages[i]);
1351                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1352                                    pages[i], CPT_TRANSIENT);
1353                 if (IS_ERR(clp)) {
1354                         rc = PTR_ERR(clp);
1355                         break;
1356                 }
1357                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1358
1359                 rc = cl_page_own(env, io, clp);
1360                 if (rc) {
1361                         LASSERT(clp->cp_state == CPS_FREEING);
1362                         cl_page_put(env, clp);
1363                         break;
1364                 }
1365
1366                 cl_2queue_add(queue, clp);
1367
1368                 /*
1369                  * drop the reference count for cl_page_find, so that the page
1370                  * will be freed in cl_2queue_fini.
1371                  */
1372                 cl_page_put(env, clp);
1373                 cl_page_clip(env, clp, 0, page_size);
1374
1375                 offset += page_size;
1376         }
1377
1378         if (rc == 0) {
1379                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1380
1381                 async = async && (typ == CRT_WRITE);
1382                 if (async)
1383                         rc = cl_io_commit_async(env, io, &queue->c2_qin,
1384                                                 0, PAGE_SIZE,
1385                                                 echo_commit_callback);
1386                 else
1387                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1388                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1389                        async ? "async" : "sync", rc);
1390         }
1391
1392         cl_echo_cancel0(env, ed, lh.cookie);
1393         EXIT;
1394 error_lock:
1395         cl_2queue_discard(env, io, queue);
1396         cl_2queue_disown(env, io, queue);
1397         cl_2queue_fini(env, queue);
1398         cl_io_fini(env, io);
1399 out:
1400         cl_env_put(env, &refcheck);
1401         return rc;
1402 }
1403 /** @} echo_exports */
1404
1405 static u64 last_object_id;
1406
1407 #ifdef HAVE_SERVER_SUPPORT
1408 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1409                                       __u64 id)
1410 {
1411         snprintf(name, ETI_NAME_LEN, "%llu", id);
1412         lname->ln_name = name;
1413         lname->ln_namelen = strlen(name);
1414 }
1415
1416 /* similar to mdt_attr_get_complex */
1417 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1418                             struct md_attr *ma)
1419 {
1420         struct echo_thread_info *info = echo_env_info(env);
1421         int rc;
1422
1423         ENTRY;
1424
1425         LASSERT(ma->ma_lmm_size > 0);
1426
1427         LASSERT(ma->ma_need & (MA_LOV | MA_LMV));
1428         if (ma->ma_need & MA_LOV)
1429                 rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1430         else
1431                 rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LMV);
1432
1433         if (rc < 0)
1434                 RETURN(rc);
1435
1436         /* big_lmm may need to be grown */
1437         if (info->eti_big_lmmsize < rc) {
1438                 int size = size_roundup_power2(rc);
1439
1440                 if (info->eti_big_lmmsize > 0) {
1441                         /* free old buffer */
1442                         LASSERT(info->eti_big_lmm);
1443                         OBD_FREE_LARGE(info->eti_big_lmm,
1444                                        info->eti_big_lmmsize);
1445                         info->eti_big_lmm = NULL;
1446                         info->eti_big_lmmsize = 0;
1447                 }
1448
1449                 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1450                 if (!info->eti_big_lmm)
1451                         RETURN(-ENOMEM);
1452                 info->eti_big_lmmsize = size;
1453         }
1454         LASSERT(info->eti_big_lmmsize >= rc);
1455
1456         info->eti_buf.lb_buf = info->eti_big_lmm;
1457         info->eti_buf.lb_len = info->eti_big_lmmsize;
1458         if (ma->ma_need & MA_LOV)
1459                 rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1460         else
1461                 rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LMV);
1462         if (rc < 0)
1463                 RETURN(rc);
1464
1465         if (ma->ma_need & MA_LOV)
1466                 ma->ma_valid |= MA_LOV;
1467         else
1468                 ma->ma_valid |= MA_LMV;
1469
1470         ma->ma_lmm = info->eti_big_lmm;
1471         ma->ma_lmm_size = rc;
1472
1473         RETURN(0);
1474 }
1475
1476 static int echo_attr_get_complex(const struct lu_env *env,
1477                                  struct md_object *next,
1478                                  struct md_attr *ma)
1479 {
1480         struct echo_thread_info *info = echo_env_info(env);
1481         struct lu_buf           *buf = &info->eti_buf;
1482         umode_t                  mode = lu_object_attr(&next->mo_lu);
1483         int                      rc = 0, rc2;
1484
1485         ENTRY;
1486
1487         ma->ma_valid = 0;
1488
1489         if (ma->ma_need & MA_INODE) {
1490                 rc = mo_attr_get(env, next, ma);
1491                 if (rc)
1492                         GOTO(out, rc);
1493                 ma->ma_valid |= MA_INODE;
1494         }
1495
1496         if ((ma->ma_need & MA_LOV) && (S_ISREG(mode) || S_ISDIR(mode))) {
1497                 LASSERT(ma->ma_lmm_size > 0);
1498                 buf->lb_buf = ma->ma_lmm;
1499                 buf->lb_len = ma->ma_lmm_size;
1500                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1501                 if (rc2 > 0) {
1502                         ma->ma_lmm_size = rc2;
1503                         ma->ma_valid |= MA_LOV;
1504                 } else if (rc2 == -ENODATA) {
1505                         /* no LOV EA */
1506                         ma->ma_lmm_size = 0;
1507                 } else if (rc2 == -ERANGE) {
1508                         rc2 = echo_big_lmm_get(env, next, ma);
1509                         if (rc2 < 0)
1510                                 GOTO(out, rc = rc2);
1511                 } else {
1512                         GOTO(out, rc = rc2);
1513                 }
1514         }
1515
1516         if ((ma->ma_need & MA_LMV) && S_ISDIR(mode)) {
1517                 LASSERT(ma->ma_lmm_size > 0);
1518                 buf->lb_buf = ma->ma_lmm;
1519                 buf->lb_len = ma->ma_lmm_size;
1520                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
1521                 if (rc2 > 0) {
1522                         ma->ma_lmm_size = rc2;
1523                         ma->ma_valid |= MA_LMV;
1524                 } else if (rc2 == -ENODATA) {
1525                         /* no LMV EA */
1526                         ma->ma_lmm_size = 0;
1527                 } else if (rc2 == -ERANGE) {
1528                         rc2 = echo_big_lmm_get(env, next, ma);
1529                         if (rc2 < 0)
1530                                 GOTO(out, rc = rc2);
1531                 } else {
1532                         GOTO(out, rc = rc2);
1533                 }
1534         }
1535
1536 #ifdef CONFIG_FS_POSIX_ACL
1537         if ((ma->ma_need & MA_ACL_DEF) && S_ISDIR(mode)) {
1538                 buf->lb_buf = ma->ma_acl;
1539                 buf->lb_len = ma->ma_acl_size;
1540                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1541                 if (rc2 > 0) {
1542                         ma->ma_acl_size = rc2;
1543                         ma->ma_valid |= MA_ACL_DEF;
1544                 } else if (rc2 == -ENODATA) {
1545                         /* no ACLs */
1546                         ma->ma_acl_size = 0;
1547                 } else {
1548                         GOTO(out, rc = rc2);
1549                 }
1550         }
1551 #endif
1552 out:
1553         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1554                rc, ma->ma_valid, ma->ma_lmm);
1555         RETURN(rc);
1556 }
1557
1558 static int
1559 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1560                         struct md_object *parent, struct lu_fid *fid,
1561                         struct lu_name *lname, struct md_op_spec *spec,
1562                         struct md_attr *ma)
1563 {
1564         struct lu_object        *ec_child, *child;
1565         struct lu_device        *ld = ed->ed_next;
1566         struct echo_thread_info *info = echo_env_info(env);
1567         struct lu_fid           *fid2 = &info->eti_fid2;
1568         struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
1569         int                      rc;
1570
1571         ENTRY;
1572
1573         rc = mdo_lookup(env, parent, lname, fid2, spec);
1574         if (rc == 0)
1575                 return -EEXIST;
1576         else if (rc != -ENOENT)
1577                 return rc;
1578
1579         ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1580                                      fid, &conf);
1581         if (IS_ERR(ec_child)) {
1582                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1583                         PTR_ERR(ec_child));
1584                 RETURN(PTR_ERR(ec_child));
1585         }
1586
1587         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1588         if (!child) {
1589                 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1590                 GOTO(out_put, rc = -EINVAL);
1591         }
1592
1593         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1594                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1595
1596         /*
1597          * Do not perform lookup sanity check. We know that name does not exist.
1598          */
1599         spec->sp_cr_lookup = 0;
1600         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1601         if (rc) {
1602                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1603                 GOTO(out_put, rc);
1604         }
1605         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
1606                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1607         EXIT;
1608 out_put:
1609         lu_object_put(env, ec_child);
1610         return rc;
1611 }
1612
1613 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1614                              struct md_attr *ma)
1615 {
1616         struct echo_thread_info *info = echo_env_info(env);
1617
1618         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1619                 ma->ma_lmm = (void *)&info->eti_lmm;
1620                 ma->ma_lmm_size = sizeof(info->eti_lmm);
1621         } else {
1622                 LASSERT(info->eti_big_lmmsize);
1623                 ma->ma_lmm = info->eti_big_lmm;
1624                 ma->ma_lmm_size = info->eti_big_lmmsize;
1625         }
1626
1627         return 0;
1628 }
1629
1630 static int
1631 echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
1632                           struct lu_object *obj, const char *name,
1633                           unsigned int namelen, __u64 id,
1634                           struct lu_object **new_parent)
1635 {
1636         struct echo_thread_info *info = echo_env_info(env);
1637         struct md_attr          *ma = &info->eti_ma;
1638         struct lmv_mds_md_v1    *lmv;
1639         struct lu_device        *ld = ed->ed_next;
1640         unsigned int            idx;
1641         struct lu_name          tmp_ln_name;
1642         struct lu_fid           stripe_fid;
1643         struct lu_object        *stripe_obj;
1644         int                     rc;
1645
1646         LASSERT(obj != NULL);
1647         LASSERT(S_ISDIR(obj->lo_header->loh_attr));
1648
1649         memset(ma, 0, sizeof(*ma));
1650         echo_set_lmm_size(env, ld, ma);
1651         ma->ma_need = MA_LMV;
1652         rc = echo_attr_get_complex(env, lu2md(obj), ma);
1653         if (rc) {
1654                 CERROR("Can not getattr child "DFID": rc = %d\n",
1655                         PFID(lu_object_fid(obj)), rc);
1656                 return rc;
1657         }
1658
1659         if (!(ma->ma_valid & MA_LMV)) {
1660                 *new_parent = obj;
1661                 return 0;
1662         }
1663
1664         lmv = (struct lmv_mds_md_v1 *)ma->ma_lmm;
1665         if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) {
1666                 rc = -EINVAL;
1667                 CERROR("Invalid mds md magic %x "DFID": rc = %d\n",
1668                        le32_to_cpu(lmv->lmv_magic), PFID(lu_object_fid(obj)),
1669                        rc);
1670                 return rc;
1671         }
1672
1673         if (name) {
1674                 tmp_ln_name.ln_name = name;
1675                 tmp_ln_name.ln_namelen = namelen;
1676         } else {
1677                 LASSERT(id != -1);
1678                 echo_md_build_name(&tmp_ln_name, info->eti_name, id);
1679         }
1680
1681         idx = lmv_name_to_stripe_index(LMV_HASH_TYPE_FNV_1A_64,
1682                                 le32_to_cpu(lmv->lmv_stripe_count),
1683                                 tmp_ln_name.ln_name, tmp_ln_name.ln_namelen);
1684
1685         LASSERT(idx < le32_to_cpu(lmv->lmv_stripe_count));
1686         fid_le_to_cpu(&stripe_fid, &lmv->lmv_stripe_fids[idx]);
1687
1688         stripe_obj = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, &stripe_fid,
1689                                        NULL);
1690         if (IS_ERR(stripe_obj)) {
1691                 rc = PTR_ERR(stripe_obj);
1692                 CERROR("Can not find the parent "DFID": rc = %d\n",
1693                        PFID(&stripe_fid), rc);
1694                 return rc;
1695         }
1696
1697         *new_parent = lu_object_locate(stripe_obj->lo_header, ld->ld_type);
1698         if (!*new_parent) {
1699                 lu_object_put(env, stripe_obj);
1700                 RETURN(-ENXIO);
1701         }
1702
1703         return rc;
1704 }
1705
1706 static int echo_create_md_object(const struct lu_env *env,
1707                                  struct echo_device *ed,
1708                                  struct lu_object *ec_parent,
1709                                  struct lu_fid *fid,
1710                                  char *name, int namelen,
1711                                   __u64 id, __u32 mode, int count,
1712                                  int stripe_count, int stripe_offset)
1713 {
1714         struct lu_object *parent;
1715         struct lu_object *new_parent;
1716         struct echo_thread_info *info = echo_env_info(env);
1717         struct lu_name *lname = &info->eti_lname;
1718         struct md_op_spec *spec = &info->eti_spec;
1719         struct md_attr *ma = &info->eti_ma;
1720         struct lu_device *ld = ed->ed_next;
1721         int rc = 0;
1722         int i;
1723
1724         ENTRY;
1725
1726         if (!ec_parent)
1727                 return -1;
1728         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1729         if (!parent)
1730                 RETURN(-ENXIO);
1731
1732         rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
1733                                        id, &new_parent);
1734         if (rc != 0)
1735                 RETURN(rc);
1736
1737         LASSERT(new_parent != NULL);
1738         memset(ma, 0, sizeof(*ma));
1739         memset(spec, 0, sizeof(*spec));
1740         echo_set_lmm_size(env, ld, ma);
1741         if (stripe_count != 0) {
1742                 spec->sp_cr_flags |= MDS_FMODE_WRITE;
1743                 if (stripe_count != -1) {
1744                         if (S_ISDIR(mode)) {
1745                                 struct lmv_user_md *lmu;
1746
1747                                 lmu = (struct lmv_user_md *)&info->eti_lum;
1748                                 lmu->lum_magic = LMV_USER_MAGIC;
1749                                 lmu->lum_stripe_offset = stripe_offset;
1750                                 lmu->lum_stripe_count = stripe_count;
1751                                 lmu->lum_hash_type = LMV_HASH_TYPE_FNV_1A_64;
1752                                 spec->u.sp_ea.eadata = lmu;
1753                                 spec->u.sp_ea.eadatalen = sizeof(*lmu);
1754                         } else {
1755                                 struct lov_user_md_v3 *lum = &info->eti_lum;
1756
1757                                 lum->lmm_magic = LOV_USER_MAGIC_V3;
1758                                 lum->lmm_stripe_count = stripe_count;
1759                                 lum->lmm_stripe_offset = stripe_offset;
1760                                 lum->lmm_pattern = LOV_PATTERN_NONE;
1761                                 spec->u.sp_ea.eadata = lum;
1762                                 spec->u.sp_ea.eadatalen = sizeof(*lum);
1763                         }
1764                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1765                 }
1766         }
1767
1768         ma->ma_attr.la_mode = mode;
1769         ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1770         ma->ma_attr.la_ctime = ktime_get_real_seconds();
1771
1772         if (name) {
1773                 lname->ln_name = name;
1774                 lname->ln_namelen = namelen;
1775                 /* If name is specified, only create one object by name */
1776                 rc = echo_md_create_internal(env, ed, lu2md(new_parent), fid,
1777                                              lname, spec, ma);
1778                 GOTO(out_put, rc);
1779         }
1780
1781         /* Create multiple object sequenced by id */
1782         for (i = 0; i < count; i++) {
1783                 char *tmp_name = info->eti_name;
1784
1785                 echo_md_build_name(lname, tmp_name, id);
1786
1787                 rc = echo_md_create_internal(env, ed, lu2md(new_parent),
1788                                              fid, lname, spec, ma);
1789                 if (rc) {
1790                         CERROR("Can not create child %s: rc = %d\n", tmp_name,
1791                                 rc);
1792                         break;
1793                 }
1794                 id++;
1795                 fid->f_oid++;
1796         }
1797
1798 out_put:
1799         if (new_parent != parent)
1800                 lu_object_put(env, new_parent);
1801
1802         RETURN(rc);
1803 }
1804
1805 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1806                                         struct echo_device *ed,
1807                                         struct md_object *parent,
1808                                         struct lu_name *lname)
1809 {
1810         struct echo_thread_info *info = echo_env_info(env);
1811         struct lu_fid *fid = &info->eti_fid;
1812         struct lu_object *child;
1813         int rc;
1814
1815         ENTRY;
1816         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1817                PFID(fid), parent);
1818
1819         rc = mdo_lookup(env, parent, lname, fid, NULL);
1820         if (rc) {
1821                 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1822                 RETURN(ERR_PTR(rc));
1823         }
1824
1825         /*
1826          * In the function below, .hs_keycmp resolves to
1827          * lu_obj_hop_keycmp()
1828          */
1829         /* coverity[overrun-buffer-val] */
1830         child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1831
1832         RETURN(child);
1833 }
1834
1835 static int echo_setattr_object(const struct lu_env *env,
1836                                struct echo_device *ed,
1837                                struct lu_object *ec_parent,
1838                                __u64 id, int count)
1839 {
1840         struct lu_object *parent;
1841         struct lu_object *new_parent;
1842         struct echo_thread_info *info = echo_env_info(env);
1843         struct lu_name *lname = &info->eti_lname;
1844         char *name = info->eti_name;
1845         struct lu_device *ld = ed->ed_next;
1846         struct lu_buf *buf = &info->eti_buf;
1847         int rc = 0;
1848         int i;
1849
1850         ENTRY;
1851
1852         if (!ec_parent)
1853                 return -1;
1854         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1855         if (!parent)
1856                 RETURN(-ENXIO);
1857
1858         rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
1859                                        &new_parent);
1860         if (rc != 0)
1861                 RETURN(rc);
1862
1863         for (i = 0; i < count; i++) {
1864                 struct lu_object *ec_child, *child;
1865
1866                 echo_md_build_name(lname, name, id);
1867
1868                 ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
1869                 if (IS_ERR(ec_child)) {
1870                         rc = PTR_ERR(ec_child);
1871                         CERROR("Can't find child %s: rc = %d\n",
1872                                 lname->ln_name, rc);
1873                         break;
1874                 }
1875
1876                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1877                 if (!child) {
1878                         CERROR("Can not locate the child %s\n", lname->ln_name);
1879                         lu_object_put(env, ec_child);
1880                         rc = -EINVAL;
1881                         break;
1882                 }
1883
1884                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1885                        PFID(lu_object_fid(child)));
1886
1887                 buf->lb_buf = info->eti_xattr_buf;
1888                 buf->lb_len = sizeof(info->eti_xattr_buf);
1889
1890                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1891                 rc = mo_xattr_set(env, lu2md(child), buf, name,
1892                                   LU_XATTR_CREATE);
1893                 if (rc < 0) {
1894                         CERROR("Can not setattr child "DFID": rc = %d\n",
1895                                 PFID(lu_object_fid(child)), rc);
1896                         lu_object_put(env, ec_child);
1897                         break;
1898                 }
1899                 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1900                        PFID(lu_object_fid(child)));
1901                 id++;
1902                 lu_object_put(env, ec_child);
1903         }
1904
1905         if (new_parent != parent)
1906                 lu_object_put(env, new_parent);
1907
1908         RETURN(rc);
1909 }
1910
1911 static int echo_getattr_object(const struct lu_env *env,
1912                                struct echo_device *ed,
1913                                struct lu_object *ec_parent,
1914                                __u64 id, int count)
1915 {
1916         struct lu_object *parent;
1917         struct lu_object *new_parent;
1918         struct echo_thread_info *info = echo_env_info(env);
1919         struct lu_name *lname = &info->eti_lname;
1920         char *name = info->eti_name;
1921         struct md_attr *ma = &info->eti_ma;
1922         struct lu_device *ld = ed->ed_next;
1923         int rc = 0;
1924         int i;
1925
1926         ENTRY;
1927
1928         if (!ec_parent)
1929                 return -1;
1930         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1931         if (!parent)
1932                 RETURN(-ENXIO);
1933
1934         rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
1935                                        &new_parent);
1936         if (rc != 0)
1937                 RETURN(rc);
1938
1939         memset(ma, 0, sizeof(*ma));
1940         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1941         ma->ma_acl = info->eti_xattr_buf;
1942         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1943
1944         for (i = 0; i < count; i++) {
1945                 struct lu_object *ec_child, *child;
1946
1947                 ma->ma_valid = 0;
1948                 echo_md_build_name(lname, name, id);
1949                 echo_set_lmm_size(env, ld, ma);
1950
1951                 ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
1952                 if (IS_ERR(ec_child)) {
1953                         CERROR("Can't find child %s: rc = %ld\n",
1954                                lname->ln_name, PTR_ERR(ec_child));
1955                         RETURN(PTR_ERR(ec_child));
1956                 }
1957
1958                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1959                 if (!child) {
1960                         CERROR("Can not locate the child %s\n", lname->ln_name);
1961                         lu_object_put(env, ec_child);
1962                         RETURN(-EINVAL);
1963                 }
1964
1965                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1966                        PFID(lu_object_fid(child)));
1967                 rc = echo_attr_get_complex(env, lu2md(child), ma);
1968                 if (rc) {
1969                         CERROR("Can not getattr child "DFID": rc = %d\n",
1970                                 PFID(lu_object_fid(child)), rc);
1971                         lu_object_put(env, ec_child);
1972                         break;
1973                 }
1974                 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1975                        PFID(lu_object_fid(child)));
1976                 id++;
1977                 lu_object_put(env, ec_child);
1978         }
1979
1980         if (new_parent != parent)
1981                 lu_object_put(env, new_parent);
1982
1983         RETURN(rc);
1984 }
1985
1986 static int echo_lookup_object(const struct lu_env *env,
1987                               struct echo_device *ed,
1988                               struct lu_object *ec_parent,
1989                               __u64 id, int count)
1990 {
1991         struct lu_object *parent;
1992         struct lu_object *new_parent;
1993         struct echo_thread_info *info = echo_env_info(env);
1994         struct lu_name *lname = &info->eti_lname;
1995         char *name = info->eti_name;
1996         struct lu_fid *fid = &info->eti_fid;
1997         struct lu_device *ld = ed->ed_next;
1998         int rc = 0;
1999         int i;
2000
2001         if (!ec_parent)
2002                 return -1;
2003         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
2004         if (!parent)
2005                 return -ENXIO;
2006
2007         rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
2008                                        &new_parent);
2009         if (rc != 0)
2010                 RETURN(rc);
2011
2012         /*prepare the requests*/
2013         for (i = 0; i < count; i++) {
2014                 echo_md_build_name(lname, name, id);
2015
2016                 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
2017                        PFID(lu_object_fid(new_parent)), lname->ln_name,
2018                        new_parent);
2019
2020                 rc = mdo_lookup(env, lu2md(new_parent), lname, fid, NULL);
2021                 if (rc) {
2022                         CERROR("Can not lookup child %s: rc = %d\n", name, rc);
2023                         break;
2024                 }
2025
2026                 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
2027                        PFID(lu_object_fid(new_parent)), lname->ln_name,
2028                        new_parent);
2029
2030                 id++;
2031         }
2032
2033         if (new_parent != parent)
2034                 lu_object_put(env, new_parent);
2035
2036         return rc;
2037 }
2038
2039 static int echo_md_destroy_internal(const struct lu_env *env,
2040                                     struct echo_device *ed,
2041                                     struct md_object *parent,
2042                                     struct lu_name *lname,
2043                                     struct md_attr *ma)
2044 {
2045         struct lu_device   *ld = ed->ed_next;
2046         struct lu_object   *ec_child;
2047         struct lu_object   *child;
2048         int                 rc;
2049
2050         ENTRY;
2051
2052         ec_child = echo_md_lookup(env, ed, parent, lname);
2053         if (IS_ERR(ec_child)) {
2054                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
2055                         PTR_ERR(ec_child));
2056                 RETURN(PTR_ERR(ec_child));
2057         }
2058
2059         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
2060         if (!child) {
2061                 CERROR("Can not locate the child %s\n", lname->ln_name);
2062                 GOTO(out_put, rc = -EINVAL);
2063         }
2064
2065         if (lu_object_remote(child)) {
2066                 CERROR("Can not destroy remote object %s: rc = %d\n",
2067                        lname->ln_name, -EPERM);
2068                 GOTO(out_put, rc = -EPERM);
2069         }
2070         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
2071                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
2072
2073         rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
2074         if (rc) {
2075                 CERROR("Can not unlink child %s: rc = %d\n",
2076                         lname->ln_name, rc);
2077                 GOTO(out_put, rc);
2078         }
2079         CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
2080                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
2081 out_put:
2082         lu_object_put(env, ec_child);
2083         return rc;
2084 }
2085
2086 static int echo_destroy_object(const struct lu_env *env,
2087                                struct echo_device *ed,
2088                                struct lu_object *ec_parent,
2089                                char *name, int namelen,
2090                                __u64 id, __u32 mode,
2091                                int count)
2092 {
2093         struct echo_thread_info *info = echo_env_info(env);
2094         struct lu_name          *lname = &info->eti_lname;
2095         struct md_attr          *ma = &info->eti_ma;
2096         struct lu_device        *ld = ed->ed_next;
2097         struct lu_object        *parent;
2098         struct lu_object        *new_parent;
2099         int                      rc = 0;
2100         int                      i;
2101
2102         ENTRY;
2103         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
2104         if (!parent)
2105                 RETURN(-EINVAL);
2106
2107         rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
2108                                        id, &new_parent);
2109         if (rc != 0)
2110                 RETURN(rc);
2111
2112         memset(ma, 0, sizeof(*ma));
2113         ma->ma_attr.la_mode = mode;
2114         ma->ma_attr.la_valid = LA_CTIME;
2115         ma->ma_attr.la_ctime = ktime_get_real_seconds();
2116         ma->ma_need = MA_INODE;
2117         ma->ma_valid = 0;
2118
2119         if (name) {
2120                 lname->ln_name = name;
2121                 lname->ln_namelen = namelen;
2122                 rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
2123                                               ma);
2124                 GOTO(out_put, rc);
2125         }
2126
2127         /*prepare the requests*/
2128         for (i = 0; i < count; i++) {
2129                 char *tmp_name = info->eti_name;
2130
2131                 ma->ma_valid = 0;
2132                 echo_md_build_name(lname, tmp_name, id);
2133
2134                 rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
2135                                               ma);
2136                 if (rc) {
2137                         CERROR("Can not unlink child %s: rc = %d\n", name, rc);
2138                         break;
2139                 }
2140                 id++;
2141         }
2142
2143 out_put:
2144         if (new_parent != parent)
2145                 lu_object_put(env, new_parent);
2146
2147         RETURN(rc);
2148 }
2149
2150 static struct lu_object *echo_resolve_path(const struct lu_env *env,
2151                                            struct echo_device *ed, char *path,
2152                                            int path_len)
2153 {
2154         struct lu_device        *ld = ed->ed_next;
2155         struct echo_thread_info *info = echo_env_info(env);
2156         struct lu_fid           *fid = &info->eti_fid;
2157         struct lu_name          *lname = &info->eti_lname;
2158         struct lu_object        *parent = NULL;
2159         struct lu_object        *child = NULL;
2160         int                      rc = 0;
2161
2162         ENTRY;
2163         *fid = ed->ed_root_fid;
2164
2165         /*
2166          * In the function below, .hs_keycmp resolves to
2167          * lu_obj_hop_keycmp()
2168          */
2169         /* coverity[overrun-buffer-val] */
2170         parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
2171         if (IS_ERR(parent)) {
2172                 CERROR("Can not find the parent "DFID": rc = %ld\n",
2173                         PFID(fid), PTR_ERR(parent));
2174                 RETURN(parent);
2175         }
2176
2177         while (1) {
2178                 struct lu_object *ld_parent;
2179                 char *e;
2180
2181                 e = strsep(&path, "/");
2182                 if (!e)
2183                         break;
2184
2185                 if (e[0] == 0) {
2186                         if (!path || path[0] == '\0')
2187                                 break;
2188                         continue;
2189                 }
2190
2191                 lname->ln_name = e;
2192                 lname->ln_namelen = strlen(e);
2193
2194                 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
2195                 if (!ld_parent) {
2196                         lu_object_put(env, parent);
2197                         rc = -EINVAL;
2198                         break;
2199                 }
2200
2201                 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
2202                 lu_object_put(env, parent);
2203                 if (IS_ERR(child)) {
2204                         rc = (int)PTR_ERR(child);
2205                         CERROR("lookup %s under parent "DFID": rc = %d\n",
2206                                 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2207                                 rc);
2208                         break;
2209                 }
2210                 parent = child;
2211         }
2212         if (rc)
2213                 RETURN(ERR_PTR(rc));
2214
2215         RETURN(parent);
2216 }
2217
2218 static void echo_ucred_init(struct lu_env *env)
2219 {
2220         struct lu_ucred *ucred = lu_ucred(env);
2221
2222         ucred->uc_valid = UCRED_INVALID;
2223
2224         ucred->uc_suppgids[0] = -1;
2225         ucred->uc_suppgids[1] = -1;
2226
2227         ucred->uc_uid = ucred->uc_o_uid  =
2228                                 from_kuid(&init_user_ns, current_uid());
2229         ucred->uc_gid = ucred->uc_o_gid  =
2230                                 from_kgid(&init_user_ns, current_gid());
2231         ucred->uc_fsuid = ucred->uc_o_fsuid =
2232                                 from_kuid(&init_user_ns, current_fsuid());
2233         ucred->uc_fsgid = ucred->uc_o_fsgid =
2234                                 from_kgid(&init_user_ns, current_fsgid());
2235         ucred->uc_cap = cfs_curproc_cap_pack();
2236
2237         /* remove fs privilege for non-root user. */
2238         if (ucred->uc_fsuid)
2239                 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2240         ucred->uc_valid = UCRED_NEW;
2241 }
2242
2243 static void echo_ucred_fini(struct lu_env *env)
2244 {
2245         struct lu_ucred *ucred = lu_ucred(env);
2246
2247         ucred->uc_valid = UCRED_INIT;
2248 }
2249
2250 static int echo_md_handler(struct echo_device *ed, int command,
2251                            char *path, int path_len, __u64 id, int count,
2252                            struct obd_ioctl_data *data)
2253 {
2254         struct echo_thread_info *info;
2255         struct lu_device *ld = ed->ed_next;
2256         struct lu_env *env;
2257         __u16 refcheck;
2258         struct lu_object *parent;
2259         char *name = NULL;
2260         int namelen = data->ioc_plen2;
2261         int rc = 0;
2262
2263         ENTRY;
2264         if (!ld) {
2265                 CERROR("MD echo client is not being initialized properly\n");
2266                 RETURN(-EINVAL);
2267         }
2268
2269         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2270                 CERROR("Only support MDD layer right now!\n");
2271                 RETURN(-EINVAL);
2272         }
2273
2274         env = cl_env_get(&refcheck);
2275         if (IS_ERR(env))
2276                 RETURN(PTR_ERR(env));
2277
2278         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_SES_TAG);
2279         if (rc != 0)
2280                 GOTO(out_env, rc);
2281
2282         /* init big_lmm buffer */
2283         info = echo_env_info(env);
2284         LASSERT(info->eti_big_lmm == NULL);
2285         OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2286         if (!info->eti_big_lmm)
2287                 GOTO(out_env, rc = -ENOMEM);
2288         info->eti_big_lmmsize = MIN_MD_SIZE;
2289
2290         parent = echo_resolve_path(env, ed, path, path_len);
2291         if (IS_ERR(parent)) {
2292                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2293                         PTR_ERR(parent));
2294                 GOTO(out_free, rc = PTR_ERR(parent));
2295         }
2296
2297         if (namelen > 0) {
2298                 OBD_ALLOC(name, namelen + 1);
2299                 if (!name)
2300                         GOTO(out_put, rc = -ENOMEM);
2301                 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2302                         GOTO(out_name, rc = -EFAULT);
2303         }
2304
2305         echo_ucred_init(env);
2306
2307         switch (command) {
2308         case ECHO_MD_CREATE:
2309         case ECHO_MD_MKDIR: {
2310                 struct echo_thread_info *info = echo_env_info(env);
2311                 __u32 mode = data->ioc_obdo2.o_mode;
2312                 struct lu_fid *fid = &info->eti_fid;
2313                 int stripe_count = (int)data->ioc_obdo2.o_misc;
2314                 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2315
2316                 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2317                 if (rc != 0)
2318                         break;
2319
2320                 /*
2321                  * In the function below, .hs_keycmp resolves to
2322                  * lu_obj_hop_keycmp()
2323                  */
2324                 /* coverity[overrun-buffer-val] */
2325                 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2326                                            id, mode, count, stripe_count,
2327                                            stripe_index);
2328                 break;
2329         }
2330         case ECHO_MD_DESTROY:
2331         case ECHO_MD_RMDIR: {
2332                 __u32 mode = data->ioc_obdo2.o_mode;
2333
2334                 rc = echo_destroy_object(env, ed, parent, name, namelen,
2335                                          id, mode, count);
2336                 break;
2337         }
2338         case ECHO_MD_LOOKUP:
2339                 rc = echo_lookup_object(env, ed, parent, id, count);
2340                 break;
2341         case ECHO_MD_GETATTR:
2342                 rc = echo_getattr_object(env, ed, parent, id, count);
2343                 break;
2344         case ECHO_MD_SETATTR:
2345                 rc = echo_setattr_object(env, ed, parent, id, count);
2346                 break;
2347         default:
2348                 CERROR("unknown command %d\n", command);
2349                 rc = -EINVAL;
2350                 break;
2351         }
2352         echo_ucred_fini(env);
2353
2354 out_name:
2355         if (name)
2356                 OBD_FREE(name, namelen + 1);
2357 out_put:
2358         lu_object_put(env, parent);
2359 out_free:
2360         LASSERT(info->eti_big_lmm);
2361         OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2362         info->eti_big_lmm = NULL;
2363         info->eti_big_lmmsize = 0;
2364 out_env:
2365         cl_env_put(env, &refcheck);
2366         return rc;
2367 }
2368 #endif /* HAVE_SERVER_SUPPORT */
2369
2370 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2371                               struct obdo *oa)
2372 {
2373         struct echo_object      *eco;
2374         struct echo_client_obd  *ec = ed->ed_ec;
2375         int created = 0;
2376         int rc;
2377
2378         ENTRY;
2379         if (!(oa->o_valid & OBD_MD_FLID) ||
2380             !(oa->o_valid & OBD_MD_FLGROUP) ||
2381             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
2382                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2383                 RETURN(-EINVAL);
2384         }
2385
2386         if (ostid_id(&oa->o_oi) == 0) {
2387                 rc = ostid_set_id(&oa->o_oi, ++last_object_id);
2388                 if (rc)
2389                         GOTO(failed, rc);
2390         }
2391
2392         rc = obd_create(env, ec->ec_exp, oa);
2393         if (rc != 0) {
2394                 CERROR("Cannot create objects: rc = %d\n", rc);
2395                 GOTO(failed, rc);
2396         }
2397
2398         created = 1;
2399
2400         oa->o_valid |= OBD_MD_FLID;
2401
2402         eco = cl_echo_object_find(ed, &oa->o_oi);
2403         if (IS_ERR(eco))
2404                 GOTO(failed, rc = PTR_ERR(eco));
2405         cl_echo_object_put(eco);
2406
2407         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2408         EXIT;
2409
2410 failed:
2411         if (created && rc != 0)
2412                 obd_destroy(env, ec->ec_exp, oa);
2413
2414         if (rc != 0)
2415                 CERROR("create object failed with: rc = %d\n", rc);
2416
2417         return rc;
2418 }
2419
2420 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2421                            struct obdo *oa)
2422 {
2423         struct echo_object *eco;
2424         int rc;
2425
2426         ENTRY;
2427         if (!(oa->o_valid & OBD_MD_FLID) ||
2428             !(oa->o_valid & OBD_MD_FLGROUP) ||
2429             ostid_id(&oa->o_oi) == 0) {
2430                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2431                 RETURN(-EINVAL);
2432         }
2433
2434         rc = 0;
2435         eco = cl_echo_object_find(ed, &oa->o_oi);
2436         if (!IS_ERR(eco))
2437                 *ecop = eco;
2438         else
2439                 rc = PTR_ERR(eco);
2440
2441         RETURN(rc);
2442 }
2443
2444 static void echo_put_object(struct echo_object *eco)
2445 {
2446         int rc;
2447
2448         rc = cl_echo_object_put(eco);
2449         if (rc)
2450                 CERROR("%s: echo client drop an object failed: rc = %d\n",
2451                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2452 }
2453
2454 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2455                                          u64 offset, u64 count)
2456 {
2457         char *addr;
2458         u64 stripe_off;
2459         u64 stripe_id;
2460         int delta;
2461
2462         /* no partial pages on the client */
2463         LASSERT(count == PAGE_SIZE);
2464
2465         addr = kmap(page);
2466
2467         for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2468                 if (rw == OBD_BRW_WRITE) {
2469                         stripe_off = offset + delta;
2470                         stripe_id = id;
2471                 } else {
2472                         stripe_off = 0xdeadbeef00c0ffeeULL;
2473                         stripe_id = 0xdeadbeef00c0ffeeULL;
2474                 }
2475                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2476                                   stripe_off, stripe_id);
2477         }
2478
2479         kunmap(page);
2480 }
2481
2482 static int
2483 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2484 {
2485         u64 stripe_off;
2486         u64 stripe_id;
2487         char *addr;
2488         int delta;
2489         int rc;
2490         int rc2;
2491
2492         /* no partial pages on the client */
2493         LASSERT(count == PAGE_SIZE);
2494
2495         addr = kmap(page);
2496
2497         for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2498                 stripe_off = offset + delta;
2499                 stripe_id = id;
2500
2501                 rc2 = block_debug_check("test_brw",
2502                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
2503                                         stripe_off, stripe_id);
2504                 if (rc2 != 0) {
2505                         CERROR("Error in echo object %#llx\n", id);
2506                         rc = rc2;
2507                 }
2508         }
2509
2510         kunmap(page);
2511         return rc;
2512 }
2513
2514 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2515                             struct echo_object *eco, u64 offset,
2516                             u64 count, int async)
2517 {
2518         size_t npages;
2519         struct brw_page *pga;
2520         struct brw_page *pgp;
2521         struct page **pages;
2522         u64 off;
2523         size_t i;
2524         int rc;
2525         int verify;
2526         gfp_t gfp_mask;
2527         u32 brw_flags = 0;
2528
2529         ENTRY;
2530         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2531                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2532                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2533
2534         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
2535
2536         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2537
2538         if ((count & (~PAGE_MASK)) != 0)
2539                 RETURN(-EINVAL);
2540
2541         /* XXX think again with misaligned I/O */
2542         npages = count >> PAGE_SHIFT;
2543
2544         if (rw == OBD_BRW_WRITE)
2545                 brw_flags = OBD_BRW_ASYNC;
2546
2547         OBD_ALLOC(pga, npages * sizeof(*pga));
2548         if (!pga)
2549                 RETURN(-ENOMEM);
2550
2551         OBD_ALLOC(pages, npages * sizeof(*pages));
2552         if (!pages) {
2553                 OBD_FREE(pga, npages * sizeof(*pga));
2554                 RETURN(-ENOMEM);
2555         }
2556
2557         for (i = 0, pgp = pga, off = offset;
2558              i < npages;
2559              i++, pgp++, off += PAGE_SIZE) {
2560
2561                 LASSERT(pgp->pg == NULL);       /* for cleanup */
2562
2563                 rc = -ENOMEM;
2564                 pgp->pg = alloc_page(gfp_mask);
2565                 if (!pgp->pg)
2566                         goto out;
2567
2568                 pages[i] = pgp->pg;
2569                 pgp->count = PAGE_SIZE;
2570                 pgp->off = off;
2571                 pgp->flag = brw_flags;
2572
2573                 if (verify)
2574                         echo_client_page_debug_setup(pgp->pg, rw,
2575                                                      ostid_id(&oa->o_oi), off,
2576                                                      pgp->count);
2577         }
2578
2579         /* brw mode can only be used at client */
2580         LASSERT(ed->ed_next != NULL);
2581         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2582
2583  out:
2584         if (rc != 0 || rw != OBD_BRW_READ)
2585                 verify = 0;
2586
2587         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2588                 if (!pgp->pg)
2589                         continue;
2590
2591                 if (verify) {
2592                         int vrc;
2593
2594                         vrc = echo_client_page_debug_check(pgp->pg,
2595                                                            ostid_id(&oa->o_oi),
2596                                                            pgp->off,
2597                                                            pgp->count);
2598                         if (vrc != 0 && rc == 0)
2599                                 rc = vrc;
2600                 }
2601                 __free_page(pgp->pg);
2602         }
2603         OBD_FREE(pga, npages * sizeof(*pga));
2604         OBD_FREE(pages, npages * sizeof(*pages));
2605         RETURN(rc);
2606 }
2607
2608 static int echo_client_prep_commit(const struct lu_env *env,
2609                                    struct obd_export *exp, int rw,
2610                                    struct obdo *oa, struct echo_object *eco,
2611                                    u64 offset, u64 count,
2612                                    u64 batch, int async)
2613 {
2614         struct obd_ioobj ioo;
2615         struct niobuf_local *lnb;
2616         struct niobuf_remote rnb;
2617         u64 off;
2618         u64 npages, tot_pages, apc;
2619         int i, ret = 0, brw_flags = 0;
2620
2621         ENTRY;
2622         if (count <= 0 || (count & ~PAGE_MASK) != 0)
2623                 RETURN(-EINVAL);
2624
2625         apc = npages = batch >> PAGE_SHIFT;
2626         tot_pages = count >> PAGE_SHIFT;
2627
2628         OBD_ALLOC_LARGE(lnb, apc * sizeof(struct niobuf_local));
2629         if (!lnb)
2630                 RETURN(-ENOMEM);
2631
2632         if (rw == OBD_BRW_WRITE && async)
2633                 brw_flags |= OBD_BRW_ASYNC;
2634
2635         obdo_to_ioobj(oa, &ioo);
2636
2637         off = offset;
2638
2639         for (; tot_pages > 0; tot_pages -= npages) {
2640                 int lpages;
2641
2642                 if (tot_pages < npages)
2643                         npages = tot_pages;
2644
2645                 rnb.rnb_offset = off;
2646                 rnb.rnb_len = npages * PAGE_SIZE;
2647                 rnb.rnb_flags = brw_flags;
2648                 ioo.ioo_bufcnt = 1;
2649                 off += npages * PAGE_SIZE;
2650
2651                 lpages = npages;
2652                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
2653                 if (ret != 0)
2654                         GOTO(out, ret);
2655
2656                 for (i = 0; i < lpages; i++) {
2657                         struct page *page = lnb[i].lnb_page;
2658
2659                         /* read past eof? */
2660                         if (!page && lnb[i].lnb_rc == 0)
2661                                 continue;
2662
2663                         if (async)
2664                                 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2665
2666                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2667                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2668                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2669                                 continue;
2670
2671                         if (rw == OBD_BRW_WRITE)
2672                                 echo_client_page_debug_setup(page, rw,
2673                                                         ostid_id(&oa->o_oi),
2674                                                         lnb[i].lnb_file_offset,
2675                                                         lnb[i].lnb_len);
2676                         else
2677                                 echo_client_page_debug_check(page,
2678                                                         ostid_id(&oa->o_oi),
2679                                                         lnb[i].lnb_file_offset,
2680                                                         lnb[i].lnb_len);
2681                 }
2682
2683                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
2684                                    ret);
2685                 if (ret != 0)
2686                         break;
2687
2688                 /* Reuse env context. */
2689                 lu_context_exit((struct lu_context *)&env->le_ctx);
2690                 lu_context_enter((struct lu_context *)&env->le_ctx);
2691         }
2692
2693 out:
2694         OBD_FREE_LARGE(lnb, apc * sizeof(struct niobuf_local));
2695
2696         RETURN(ret);
2697 }
2698
2699 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2700                                  struct obd_export *exp,
2701                                  struct obd_ioctl_data *data)
2702 {
2703         struct obd_device *obd = class_exp2obd(exp);
2704         struct echo_device *ed = obd2echo_dev(obd);
2705         struct echo_client_obd *ec = ed->ed_ec;
2706         struct obdo *oa = &data->ioc_obdo1;
2707         struct echo_object *eco;
2708         int rc;
2709         int async = 0;
2710         long test_mode;
2711
2712         ENTRY;
2713         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2714
2715         rc = echo_get_object(&eco, ed, oa);
2716         if (rc)
2717                 RETURN(rc);
2718
2719         oa->o_valid &= ~OBD_MD_FLHANDLE;
2720
2721         /* OFD/obdfilter works only via prep/commit */
2722         test_mode = (long)data->ioc_pbuf1;
2723         if (!ed->ed_next && test_mode != 3) {
2724                 test_mode = 3;
2725                 data->ioc_plen1 = data->ioc_count;
2726         }
2727
2728         if (test_mode == 3)
2729                 async = 1;
2730
2731         /* Truncate batch size to maximum */
2732         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2733                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2734
2735         switch (test_mode) {
2736         case 1:
2737                 /* fall through */
2738         case 2:
2739                 rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
2740                                       data->ioc_count, async);
2741                 break;
2742         case 3:
2743                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
2744                                              data->ioc_offset, data->ioc_count,
2745                                              data->ioc_plen1, async);
2746                 break;
2747         default:
2748                 rc = -EINVAL;
2749         }
2750
2751         echo_put_object(eco);
2752
2753         RETURN(rc);
2754 }
2755
2756 static int
2757 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2758                       void *karg, void __user *uarg)
2759 {
2760 #ifdef HAVE_SERVER_SUPPORT
2761         struct tgt_session_info *tsi;
2762 #endif
2763         struct obd_device      *obd = exp->exp_obd;
2764         struct echo_device     *ed = obd2echo_dev(obd);
2765         struct echo_client_obd *ec = ed->ed_ec;
2766         struct echo_object     *eco;
2767         struct obd_ioctl_data  *data = karg;
2768         struct lu_env          *env;
2769         unsigned long           env_tags = 0;
2770         __u16                   refcheck;
2771         struct obdo            *oa;
2772         struct lu_fid           fid;
2773         int                     rw = OBD_BRW_READ;
2774         int                     rc = 0;
2775
2776         ENTRY;
2777         oa = &data->ioc_obdo1;
2778         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2779                 oa->o_valid |= OBD_MD_FLGROUP;
2780                 ostid_set_seq_echo(&oa->o_oi);
2781         }
2782
2783         /* This FID is unpacked just for validation at this point */
2784         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2785         if (rc < 0)
2786                 RETURN(rc);
2787
2788         env = cl_env_get(&refcheck);
2789         if (IS_ERR(env))
2790                 RETURN(PTR_ERR(env));
2791
2792         lu_env_add(env);
2793
2794 #ifdef HAVE_SERVER_SUPPORT
2795         if (cmd == OBD_IOC_ECHO_MD || cmd == OBD_IOC_ECHO_ALLOC_SEQ)
2796                 env_tags = ECHO_MD_CTX_TAG;
2797         else
2798 #endif
2799                 env_tags = ECHO_DT_CTX_TAG;
2800
2801         rc = lu_env_refill_by_tags(env, env_tags, ECHO_SES_TAG);
2802         if (rc != 0)
2803                 GOTO(out, rc);
2804
2805 #ifdef HAVE_SERVER_SUPPORT
2806         tsi = tgt_ses_info(env);
2807         /* treat as local operation */
2808         tsi->tsi_exp = NULL;
2809         tsi->tsi_jobid = NULL;
2810 #endif
2811
2812         switch (cmd) {
2813         case OBD_IOC_CREATE:                    /* may create echo object */
2814                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2815                         GOTO(out, rc = -EPERM);
2816
2817                 rc = echo_create_object(env, ed, oa);
2818                 GOTO(out, rc);
2819
2820 #ifdef HAVE_SERVER_SUPPORT
2821         case OBD_IOC_ECHO_MD: {
2822                 int count;
2823                 int cmd;
2824                 char *dir = NULL;
2825                 int dirlen;
2826                 __u64 id;
2827
2828                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2829                         GOTO(out, rc = -EPERM);
2830
2831                 count = data->ioc_count;
2832                 cmd = data->ioc_command;
2833
2834                 id = data->ioc_obdo2.o_oi.oi.oi_id;
2835                 dirlen = data->ioc_plen1;
2836                 OBD_ALLOC(dir, dirlen + 1);
2837                 if (!dir)
2838                         GOTO(out, rc = -ENOMEM);
2839
2840                 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2841                         OBD_FREE(dir, data->ioc_plen1 + 1);
2842                         GOTO(out, rc = -EFAULT);
2843                 }
2844
2845                 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2846                 OBD_FREE(dir, dirlen + 1);
2847                 GOTO(out, rc);
2848         }
2849         case OBD_IOC_ECHO_ALLOC_SEQ: {
2850                 __u64            seq;
2851                 int              max_count;
2852
2853                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2854                         GOTO(out, rc = -EPERM);
2855
2856                 rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
2857                 if (rc < 0) {
2858                         CERROR("%s: Can not alloc seq: rc = %d\n",
2859                                obd->obd_name, rc);
2860                         GOTO(out, rc);
2861                 }
2862
2863                 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2864                         return -EFAULT;
2865
2866                 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2867                 if (copy_to_user(data->ioc_pbuf2, &max_count,
2868                                      data->ioc_plen2))
2869                         return -EFAULT;
2870                 GOTO(out, rc);
2871         }
2872 #endif /* HAVE_SERVER_SUPPORT */
2873         case OBD_IOC_DESTROY:
2874                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2875                         GOTO(out, rc = -EPERM);
2876
2877                 rc = echo_get_object(&eco, ed, oa);
2878                 if (rc == 0) {
2879                         rc = obd_destroy(env, ec->ec_exp, oa);
2880                         if (rc == 0)
2881                                 eco->eo_deleted = 1;
2882                         echo_put_object(eco);
2883                 }
2884                 GOTO(out, rc);
2885
2886         case OBD_IOC_GETATTR:
2887                 rc = echo_get_object(&eco, ed, oa);
2888                 if (rc == 0) {
2889                         rc = obd_getattr(env, ec->ec_exp, oa);
2890                         echo_put_object(eco);
2891                 }
2892                 GOTO(out, rc);
2893
2894         case OBD_IOC_SETATTR:
2895                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2896                         GOTO(out, rc = -EPERM);
2897
2898                 rc = echo_get_object(&eco, ed, oa);
2899                 if (rc == 0) {
2900                         rc = obd_setattr(env, ec->ec_exp, oa);
2901                         echo_put_object(eco);
2902                 }
2903                 GOTO(out, rc);
2904
2905         case OBD_IOC_BRW_WRITE:
2906                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2907                         GOTO(out, rc = -EPERM);
2908
2909                 rw = OBD_BRW_WRITE;
2910                 /* fall through */
2911         case OBD_IOC_BRW_READ:
2912                 rc = echo_client_brw_ioctl(env, rw, exp, data);
2913                 GOTO(out, rc);
2914
2915         default:
2916                 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2917                 GOTO(out, rc = -ENOTTY);
2918         }
2919
2920         EXIT;
2921 out:
2922         lu_env_remove(env);
2923         cl_env_put(env, &refcheck);
2924
2925         return rc;
2926 }
2927
2928 static int echo_client_setup(const struct lu_env *env,
2929                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2930 {
2931         struct echo_client_obd *ec = &obddev->u.echo_client;
2932         struct obd_device *tgt;
2933         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2934         struct obd_connect_data *ocd = NULL;
2935         int rc;
2936
2937         ENTRY;
2938         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2939                 CERROR("requires a TARGET OBD name\n");
2940                 RETURN(-EINVAL);
2941         }
2942
2943         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2944         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2945                 CERROR("device not attached or not set up (%s)\n",
2946                        lustre_cfg_string(lcfg, 1));
2947                 RETURN(-EINVAL);
2948         }
2949
2950         spin_lock_init(&ec->ec_lock);
2951         INIT_LIST_HEAD(&ec->ec_objects);
2952         INIT_LIST_HEAD(&ec->ec_locks);
2953         ec->ec_unique = 0;
2954
2955         lu_context_tags_update(ECHO_DT_CTX_TAG);
2956         lu_session_tags_update(ECHO_SES_TAG);
2957
2958         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2959 #ifdef HAVE_SERVER_SUPPORT
2960                 lu_context_tags_update(ECHO_MD_CTX_TAG);
2961 #else
2962                 CERROR(
2963                        "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
2964 #endif
2965                 RETURN(0);
2966         }
2967
2968         OBD_ALLOC(ocd, sizeof(*ocd));
2969         if (!ocd) {
2970                 CERROR("Can't alloc ocd connecting to %s\n",
2971                        lustre_cfg_string(lcfg, 1));
2972                 return -ENOMEM;
2973         }
2974
2975         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2976                                  OBD_CONNECT_BRW_SIZE |
2977                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2978                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2979                                  OBD_CONNECT_FID;
2980         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2981         ocd->ocd_version = LUSTRE_VERSION_CODE;
2982         ocd->ocd_group = FID_SEQ_ECHO;
2983
2984         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2985         if (rc == 0) {
2986                 /* Turn off pinger because it connects to tgt obd directly. */
2987                 spin_lock(&tgt->obd_dev_lock);
2988                 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2989                 spin_unlock(&tgt->obd_dev_lock);
2990         }
2991
2992         OBD_FREE(ocd, sizeof(*ocd));
2993
2994         if (rc != 0) {
2995                 CERROR("fail to connect to device %s\n",
2996                        lustre_cfg_string(lcfg, 1));
2997                 return rc;
2998         }
2999
3000         RETURN(rc);
3001 }
3002
3003 static int echo_client_cleanup(struct obd_device *obddev)
3004 {
3005         struct echo_device *ed = obd2echo_dev(obddev);
3006         struct echo_client_obd *ec = &obddev->u.echo_client;
3007         int rc;
3008
3009         ENTRY;
3010         /*Do nothing for Metadata echo client*/
3011         if (!ed)
3012                 RETURN(0);
3013
3014         lu_session_tags_clear(ECHO_SES_TAG & ~LCT_SESSION);
3015         lu_context_tags_clear(ECHO_DT_CTX_TAG);
3016         if (ed->ed_next_ismd) {
3017 #ifdef HAVE_SERVER_SUPPORT
3018                 lu_context_tags_clear(ECHO_MD_CTX_TAG);
3019 #else
3020                 CERROR(
3021                        "This is client-side only module, does not support metadata echo client.\n");
3022 #endif
3023                 RETURN(0);
3024         }
3025
3026         if (!list_empty(&obddev->obd_exports)) {
3027                 CERROR("still has clients!\n");
3028                 RETURN(-EBUSY);
3029         }
3030
3031         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
3032         rc = obd_disconnect(ec->ec_exp);
3033         if (rc != 0)
3034                 CERROR("fail to disconnect device: %d\n", rc);
3035
3036         RETURN(rc);
3037 }
3038
3039 static int echo_client_connect(const struct lu_env *env,
3040                                struct obd_export **exp,
3041                                struct obd_device *src, struct obd_uuid *cluuid,
3042                                struct obd_connect_data *data, void *localdata)
3043 {
3044         int rc;
3045         struct lustre_handle conn = { 0 };
3046
3047         ENTRY;
3048         rc = class_connect(&conn, src, cluuid);
3049         if (rc == 0)
3050                 *exp = class_conn2export(&conn);
3051
3052         RETURN(rc);
3053 }
3054
3055 static int echo_client_disconnect(struct obd_export *exp)
3056 {
3057         int rc;
3058
3059         ENTRY;
3060         if (!exp)
3061                 GOTO(out, rc = -EINVAL);
3062
3063         rc = class_disconnect(exp);
3064         GOTO(out, rc);
3065 out:
3066         return rc;
3067 }
3068
3069 static struct obd_ops echo_client_obd_ops = {
3070         .o_owner       = THIS_MODULE,
3071         .o_iocontrol   = echo_client_iocontrol,
3072         .o_connect     = echo_client_connect,
3073         .o_disconnect  = echo_client_disconnect
3074 };
3075
3076 static int __init obdecho_init(void)
3077 {
3078         int rc;
3079
3080         ENTRY;
3081         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
3082
3083         LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
3084
3085 # ifdef HAVE_SERVER_SUPPORT
3086         rc = echo_persistent_pages_init();
3087         if (rc != 0)
3088                 goto failed_0;
3089
3090         rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
3091                                  LUSTRE_ECHO_NAME, &echo_srv_type);
3092         if (rc != 0)
3093                 goto failed_1;
3094 # endif
3095
3096         rc = lu_kmem_init(echo_caches);
3097         if (rc == 0) {
3098                 rc = class_register_type(&echo_client_obd_ops, NULL, false,
3099                                          NULL, LUSTRE_ECHO_CLIENT_NAME,
3100                                          &echo_device_type);
3101                 if (rc)
3102                         lu_kmem_fini(echo_caches);
3103         }
3104
3105 # ifdef HAVE_SERVER_SUPPORT
3106         if (rc == 0)
3107                 RETURN(0);
3108
3109         class_unregister_type(LUSTRE_ECHO_NAME);
3110 failed_1:
3111         echo_persistent_pages_fini();
3112 failed_0:
3113 # endif
3114         RETURN(rc);
3115 }
3116
3117 static void __exit obdecho_exit(void)
3118 {
3119         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
3120         lu_kmem_fini(echo_caches);
3121
3122 #ifdef HAVE_SERVER_SUPPORT
3123         class_unregister_type(LUSTRE_ECHO_NAME);
3124         echo_persistent_pages_fini();
3125 #endif
3126 }
3127
3128 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3129 MODULE_DESCRIPTION("Lustre Echo Client test driver");
3130 MODULE_VERSION(LUSTRE_VERSION_STRING);
3131 MODULE_LICENSE("GPL");
3132
3133 module_init(obdecho_init);
3134 module_exit(obdecho_exit);
3135
3136 /** @} echo_client */