Whamcloud - gitweb
ea82f267c739792506cc4697b7d4d05701bb7b7e
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38
39 #include <linux/user_namespace.h>
40 #ifdef HAVE_UIDGID_HEADER
41 # include <linux/uidgid.h>
42 #endif
43 #include <libcfs/libcfs.h>
44
45 #include <obd.h>
46 #include <obd_support.h>
47 #include <obd_class.h>
48 #include <lustre_debug.h>
49 #include <lprocfs_status.h>
50 #include <cl_object.h>
51 #include <lustre_fid.h>
52 #include <lustre_acl.h>
53 #include <lustre_ioctl.h>
54 #include <lustre_net.h>
55 #ifdef HAVE_SERVER_SUPPORT
56 # include <md_object.h>
57 #endif /* HAVE_SERVER_SUPPORT */
58
59 #include "echo_internal.h"
60
61 /** \defgroup echo_client Echo Client
62  * @{
63  */
64
65 struct echo_device {
66         struct cl_device          ed_cl;
67         struct echo_client_obd   *ed_ec;
68
69         struct cl_site            ed_site_myself;
70         struct lu_site           *ed_site;
71         struct lu_device         *ed_next;
72         int                       ed_next_ismd;
73         struct lu_client_seq     *ed_cl_seq;
74 #ifdef HAVE_SERVER_SUPPORT
75         struct local_oid_storage *ed_los;
76         struct lu_fid             ed_root_fid;
77 #endif /* HAVE_SERVER_SUPPORT */
78 };
79
80 struct echo_object {
81         struct cl_object        eo_cl;
82         struct cl_object_header eo_hdr;
83         struct echo_device     *eo_dev;
84         struct list_head        eo_obj_chain;
85         struct lov_oinfo       *eo_oinfo;
86         atomic_t                eo_npages;
87         int                     eo_deleted;
88 };
89
90 struct echo_object_conf {
91         struct cl_object_conf   eoc_cl;
92         struct lov_oinfo      **eoc_oinfo;
93 };
94
95 struct echo_page {
96         struct cl_page_slice    ep_cl;
97         struct mutex            ep_lock;
98 };
99
100 struct echo_lock {
101         struct cl_lock_slice    el_cl;
102         struct list_head        el_chain;
103         struct echo_object     *el_object;
104         __u64                   el_cookie;
105         atomic_t                el_refcount;
106 };
107
108 #ifdef HAVE_SERVER_SUPPORT
109 static const char echo_md_root_dir_name[] = "ROOT_ECHO";
110
111 /**
112  * In order to use the values of members in struct mdd_device,
113  * we define an alias structure here.
114  */
115 struct echo_md_device {
116         struct md_device                 emd_md_dev;
117         struct obd_export               *emd_child_exp;
118         struct dt_device                *emd_child;
119         struct dt_device                *emd_bottom;
120         struct lu_fid                    emd_root_fid;
121         struct lu_fid                    emd_local_root_fid;
122 };
123 #endif /* HAVE_SERVER_SUPPORT */
124
125 static int echo_client_setup(const struct lu_env *env,
126                              struct obd_device *obddev,
127                              struct lustre_cfg *lcfg);
128 static int echo_client_cleanup(struct obd_device *obddev);
129
130
131 /** \defgroup echo_helpers Helper functions
132  * @{
133  */
134 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
135 {
136         return container_of0(dev, struct echo_device, ed_cl);
137 }
138
139 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
140 {
141         return &d->ed_cl;
142 }
143
144 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
145 {
146         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
147 }
148
149 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
150 {
151         return &eco->eo_cl;
152 }
153
154 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
155 {
156         return container_of(o, struct echo_object, eo_cl);
157 }
158
159 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
160 {
161         return container_of(s, struct echo_page, ep_cl);
162 }
163
164 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
165 {
166         return container_of(s, struct echo_lock, el_cl);
167 }
168
169 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
170 {
171         return ecl->el_cl.cls_lock;
172 }
173
174 static struct lu_context_key echo_thread_key;
175 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
176 {
177         struct echo_thread_info *info;
178         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
179         LASSERT(info != NULL);
180         return info;
181 }
182
183 static inline
184 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
185 {
186         return container_of(c, struct echo_object_conf, eoc_cl);
187 }
188
189 #ifdef HAVE_SERVER_SUPPORT
190 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
191 {
192         return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
193 }
194
195 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
196 {
197         return &d->emd_md_dev.md_lu_dev;
198 }
199
200 static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
201 {
202         return emd2lu_dev(d)->ld_site->ld_seq_site;
203 }
204
205 static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
206 {
207         return d->emd_md_dev.md_lu_dev.ld_obd;
208 }
209 #endif /* HAVE_SERVER_SUPPORT */
210
211 /** @} echo_helpers */
212
213 static int cl_echo_object_put(struct echo_object *eco);
214 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
215                               struct page **pages, int npages, int async);
216
217 struct echo_thread_info {
218         struct echo_object_conf eti_conf;
219         struct lustre_md        eti_md;
220
221         struct cl_2queue        eti_queue;
222         struct cl_io            eti_io;
223         struct cl_lock          eti_lock;
224         struct lu_fid           eti_fid;
225         struct lu_fid           eti_fid2;
226 #ifdef HAVE_SERVER_SUPPORT
227         struct md_op_spec       eti_spec;
228         struct lov_mds_md_v3    eti_lmm;
229         struct lov_user_md_v3   eti_lum;
230         struct md_attr          eti_ma;
231         struct lu_name          eti_lname;
232         /* per-thread values, can be re-used */
233         void                    *eti_big_lmm; /* may be vmalloc'd */
234         int                     eti_big_lmmsize;
235         char                    eti_name[20];
236         struct lu_buf           eti_buf;
237         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
238 #endif
239 };
240
241 /* No session used right now */
242 struct echo_session_info {
243         unsigned long dummy;
244 };
245
246 static struct kmem_cache *echo_lock_kmem;
247 static struct kmem_cache *echo_object_kmem;
248 static struct kmem_cache *echo_thread_kmem;
249 static struct kmem_cache *echo_session_kmem;
250 /* static struct kmem_cache *echo_req_kmem; */
251
252 static struct lu_kmem_descr echo_caches[] = {
253         {
254                 .ckd_cache = &echo_lock_kmem,
255                 .ckd_name  = "echo_lock_kmem",
256                 .ckd_size  = sizeof (struct echo_lock)
257         },
258         {
259                 .ckd_cache = &echo_object_kmem,
260                 .ckd_name  = "echo_object_kmem",
261                 .ckd_size  = sizeof (struct echo_object)
262         },
263         {
264                 .ckd_cache = &echo_thread_kmem,
265                 .ckd_name  = "echo_thread_kmem",
266                 .ckd_size  = sizeof (struct echo_thread_info)
267         },
268         {
269                 .ckd_cache = &echo_session_kmem,
270                 .ckd_name  = "echo_session_kmem",
271                 .ckd_size  = sizeof (struct echo_session_info)
272         },
273         {
274                 .ckd_cache = NULL
275         }
276 };
277
278 /** \defgroup echo_page Page operations
279  *
280  * Echo page operations.
281  *
282  * @{
283  */
284 static int echo_page_own(const struct lu_env *env,
285                          const struct cl_page_slice *slice,
286                          struct cl_io *io, int nonblock)
287 {
288         struct echo_page *ep = cl2echo_page(slice);
289
290         if (!nonblock)
291                 mutex_lock(&ep->ep_lock);
292         else if (!mutex_trylock(&ep->ep_lock))
293                 return -EAGAIN;
294         return 0;
295 }
296
297 static void echo_page_disown(const struct lu_env *env,
298                              const struct cl_page_slice *slice,
299                              struct cl_io *io)
300 {
301         struct echo_page *ep = cl2echo_page(slice);
302
303         LASSERT(mutex_is_locked(&ep->ep_lock));
304         mutex_unlock(&ep->ep_lock);
305 }
306
307 static void echo_page_discard(const struct lu_env *env,
308                               const struct cl_page_slice *slice,
309                               struct cl_io *unused)
310 {
311         cl_page_delete(env, slice->cpl_page);
312 }
313
314 static int echo_page_is_vmlocked(const struct lu_env *env,
315                                  const struct cl_page_slice *slice)
316 {
317         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
318                 return -EBUSY;
319         return -ENODATA;
320 }
321
322 static void echo_page_completion(const struct lu_env *env,
323                                  const struct cl_page_slice *slice,
324                                  int ioret)
325 {
326         LASSERT(slice->cpl_page->cp_sync_io != NULL);
327 }
328
329 static void echo_page_fini(const struct lu_env *env,
330                            struct cl_page_slice *slice)
331 {
332         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
333         ENTRY;
334
335         atomic_dec(&eco->eo_npages);
336         page_cache_release(slice->cpl_page->cp_vmpage);
337         EXIT;
338 }
339
340 static int echo_page_prep(const struct lu_env *env,
341                           const struct cl_page_slice *slice,
342                           struct cl_io *unused)
343 {
344         return 0;
345 }
346
347 static int echo_page_print(const struct lu_env *env,
348                            const struct cl_page_slice *slice,
349                            void *cookie, lu_printer_t printer)
350 {
351         struct echo_page *ep = cl2echo_page(slice);
352
353         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
354                    ep, mutex_is_locked(&ep->ep_lock),
355                    slice->cpl_page->cp_vmpage);
356         return 0;
357 }
358
359 static const struct cl_page_operations echo_page_ops = {
360         .cpo_own           = echo_page_own,
361         .cpo_disown        = echo_page_disown,
362         .cpo_discard       = echo_page_discard,
363         .cpo_fini          = echo_page_fini,
364         .cpo_print         = echo_page_print,
365         .cpo_is_vmlocked   = echo_page_is_vmlocked,
366         .io = {
367                 [CRT_READ] = {
368                         .cpo_prep        = echo_page_prep,
369                         .cpo_completion  = echo_page_completion,
370                 },
371                 [CRT_WRITE] = {
372                         .cpo_prep        = echo_page_prep,
373                         .cpo_completion  = echo_page_completion,
374                 }
375         }
376 };
377 /** @} echo_page */
378
379 /** \defgroup echo_lock Locking
380  *
381  * echo lock operations
382  *
383  * @{
384  */
385 static void echo_lock_fini(const struct lu_env *env,
386                            struct cl_lock_slice *slice)
387 {
388         struct echo_lock *ecl = cl2echo_lock(slice);
389
390         LASSERT(list_empty(&ecl->el_chain));
391         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
392 }
393
394 static struct cl_lock_operations echo_lock_ops = {
395         .clo_fini      = echo_lock_fini,
396 };
397
398 /** @} echo_lock */
399
400 /** \defgroup echo_cl_ops cl_object operations
401  *
402  * operations for cl_object
403  *
404  * @{
405  */
406 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
407                           struct cl_page *page, pgoff_t index)
408 {
409         struct echo_page *ep = cl_object_page_slice(obj, page);
410         struct echo_object *eco = cl2echo_obj(obj);
411         ENTRY;
412
413         page_cache_get(page->cp_vmpage);
414         mutex_init(&ep->ep_lock);
415         cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
416         atomic_inc(&eco->eo_npages);
417         RETURN(0);
418 }
419
420 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
421                         struct cl_io *io)
422 {
423         return 0;
424 }
425
426 static int echo_lock_init(const struct lu_env *env,
427                           struct cl_object *obj, struct cl_lock *lock,
428                           const struct cl_io *unused)
429 {
430         struct echo_lock *el;
431         ENTRY;
432
433         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
434         if (el != NULL) {
435                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
436                 el->el_object = cl2echo_obj(obj);
437                 INIT_LIST_HEAD(&el->el_chain);
438                 atomic_set(&el->el_refcount, 0);
439         }
440         RETURN(el == NULL ? -ENOMEM : 0);
441 }
442
443 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
444                          const struct cl_object_conf *conf)
445 {
446         return 0;
447 }
448
449 static const struct cl_object_operations echo_cl_obj_ops = {
450         .coo_page_init = echo_page_init,
451         .coo_lock_init = echo_lock_init,
452         .coo_io_init   = echo_io_init,
453         .coo_conf_set  = echo_conf_set
454 };
455 /** @} echo_cl_ops */
456
457 /** \defgroup echo_lu_ops lu_object operations
458  *
459  * operations for echo lu object.
460  *
461  * @{
462  */
463 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
464                             const struct lu_object_conf *conf)
465 {
466         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
467         struct echo_client_obd *ec     = ed->ed_ec;
468         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
469         ENTRY;
470
471         if (ed->ed_next) {
472                 struct lu_object  *below;
473                 struct lu_device  *under;
474
475                 under = ed->ed_next;
476                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
477                                                         under);
478                 if (below == NULL)
479                         RETURN(-ENOMEM);
480                 lu_object_add(obj, below);
481         }
482
483         if (!ed->ed_next_ismd) {
484                 const struct cl_object_conf *cconf = lu2cl_conf(conf);
485                 struct echo_object_conf *econf = cl2echo_conf(cconf);
486
487                 LASSERT(econf->eoc_oinfo != NULL);
488
489                 /* Transfer the oinfo pointer to eco that it won't be
490                  * freed. */
491                 eco->eo_oinfo = *econf->eoc_oinfo;
492                 *econf->eoc_oinfo = NULL;
493         } else {
494                 eco->eo_oinfo = NULL;
495         }
496
497         eco->eo_dev = ed;
498         atomic_set(&eco->eo_npages, 0);
499         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
500
501         spin_lock(&ec->ec_lock);
502         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
503         spin_unlock(&ec->ec_lock);
504
505         RETURN(0);
506 }
507
508 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
509 {
510         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
511         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
512         ENTRY;
513
514         LASSERT(atomic_read(&eco->eo_npages) == 0);
515
516         spin_lock(&ec->ec_lock);
517         list_del_init(&eco->eo_obj_chain);
518         spin_unlock(&ec->ec_lock);
519
520         lu_object_fini(obj);
521         lu_object_header_fini(obj->lo_header);
522
523         if (eco->eo_oinfo != NULL)
524                 OBD_FREE_PTR(eco->eo_oinfo);
525
526         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
527         EXIT;
528 }
529
530 static int echo_object_print(const struct lu_env *env, void *cookie,
531                             lu_printer_t p, const struct lu_object *o)
532 {
533         struct echo_object *obj = cl2echo_obj(lu2cl(o));
534
535         return (*p)(env, cookie, "echoclient-object@%p", obj);
536 }
537
538 static const struct lu_object_operations echo_lu_obj_ops = {
539         .loo_object_init      = echo_object_init,
540         .loo_object_delete    = NULL,
541         .loo_object_release   = NULL,
542         .loo_object_free      = echo_object_free,
543         .loo_object_print     = echo_object_print,
544         .loo_object_invariant = NULL
545 };
546 /** @} echo_lu_ops */
547
548 /** \defgroup echo_lu_dev_ops  lu_device operations
549  *
550  * Operations for echo lu device.
551  *
552  * @{
553  */
554 static struct lu_object *echo_object_alloc(const struct lu_env *env,
555                                            const struct lu_object_header *hdr,
556                                            struct lu_device *dev)
557 {
558         struct echo_object *eco;
559         struct lu_object *obj = NULL;
560         ENTRY;
561
562         /* we're the top dev. */
563         LASSERT(hdr == NULL);
564         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
565         if (eco != NULL) {
566                 struct cl_object_header *hdr = &eco->eo_hdr;
567
568                 obj = &echo_obj2cl(eco)->co_lu;
569                 cl_object_header_init(hdr);
570                 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
571
572                 lu_object_init(obj, &hdr->coh_lu, dev);
573                 lu_object_add_top(&hdr->coh_lu, obj);
574
575                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
576                 obj->lo_ops       = &echo_lu_obj_ops;
577         }
578         RETURN(obj);
579 }
580
581 static struct lu_device_operations echo_device_lu_ops = {
582         .ldo_object_alloc   = echo_object_alloc,
583 };
584
585 /** @} echo_lu_dev_ops */
586
587 /** \defgroup echo_init Setup and teardown
588  *
589  * Init and fini functions for echo client.
590  *
591  * @{
592  */
593 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
594 {
595         struct cl_site *site = &ed->ed_site_myself;
596         int rc;
597
598         /* initialize site */
599         rc = cl_site_init(site, &ed->ed_cl);
600         if (rc) {
601                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
602                 return rc;
603         }
604
605         rc = lu_site_init_finish(&site->cs_lu);
606         if (rc) {
607                 cl_site_fini(site);
608                 return rc;
609         }
610
611         ed->ed_site = &site->cs_lu;
612         return 0;
613 }
614
615 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
616 {
617         if (ed->ed_site) {
618                 if (!ed->ed_next_ismd)
619                         lu_site_fini(ed->ed_site);
620                 ed->ed_site = NULL;
621         }
622 }
623
624 static void *echo_thread_key_init(const struct lu_context *ctx,
625                                   struct lu_context_key *key)
626 {
627         struct echo_thread_info *info;
628
629         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
630         if (info == NULL)
631                 info = ERR_PTR(-ENOMEM);
632         return info;
633 }
634
635 static void echo_thread_key_fini(const struct lu_context *ctx,
636                          struct lu_context_key *key, void *data)
637 {
638         struct echo_thread_info *info = data;
639         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
640 }
641
642 static struct lu_context_key echo_thread_key = {
643         .lct_tags = LCT_CL_THREAD,
644         .lct_init = echo_thread_key_init,
645         .lct_fini = echo_thread_key_fini,
646 };
647
648 static void *echo_session_key_init(const struct lu_context *ctx,
649                                   struct lu_context_key *key)
650 {
651         struct echo_session_info *session;
652
653         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
654         if (session == NULL)
655                 session = ERR_PTR(-ENOMEM);
656         return session;
657 }
658
659 static void echo_session_key_fini(const struct lu_context *ctx,
660                                  struct lu_context_key *key, void *data)
661 {
662         struct echo_session_info *session = data;
663         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
664 }
665
666 static struct lu_context_key echo_session_key = {
667         .lct_tags = LCT_SESSION,
668         .lct_init = echo_session_key_init,
669         .lct_fini = echo_session_key_fini,
670 };
671
672 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
673
674 #ifdef HAVE_SERVER_SUPPORT
675 # define ECHO_SEQ_WIDTH 0xffffffff
676 static int echo_fid_init(struct echo_device *ed, char *obd_name,
677                          struct seq_server_site *ss)
678 {
679         char *prefix;
680         int rc;
681         ENTRY;
682
683         OBD_ALLOC_PTR(ed->ed_cl_seq);
684         if (ed->ed_cl_seq == NULL)
685                 RETURN(-ENOMEM);
686
687         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
688         if (prefix == NULL)
689                 GOTO(out_free_seq, rc = -ENOMEM);
690
691         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
692
693         /* Init client side sequence-manager */
694         rc = seq_client_init(ed->ed_cl_seq, NULL,
695                              LUSTRE_SEQ_METADATA,
696                              prefix, ss->ss_server_seq);
697         ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
698         OBD_FREE(prefix, MAX_OBD_NAME + 5);
699         if (rc)
700                 GOTO(out_free_seq, rc);
701
702         RETURN(0);
703
704 out_free_seq:
705         OBD_FREE_PTR(ed->ed_cl_seq);
706         ed->ed_cl_seq = NULL;
707         RETURN(rc);
708 }
709
710 static int echo_fid_fini(struct obd_device *obddev)
711 {
712         struct echo_device *ed = obd2echo_dev(obddev);
713         ENTRY;
714
715         if (ed->ed_cl_seq != NULL) {
716                 seq_client_fini(ed->ed_cl_seq);
717                 OBD_FREE_PTR(ed->ed_cl_seq);
718                 ed->ed_cl_seq = NULL;
719         }
720
721         RETURN(0);
722 }
723
724 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
725 {
726         ENTRY;
727
728         if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
729                 local_oid_storage_fini(env, ed->ed_los);
730                 ed->ed_los = NULL;
731         }
732 }
733
734 static int
735 echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
736                           struct local_oid_storage *los,
737                           const struct lu_fid *pfid, const char *name,
738                           __u32 mode, struct lu_fid *fid)
739 {
740         struct dt_object        *parent = NULL;
741         struct dt_object        *dto = NULL;
742         int                      rc = 0;
743         ENTRY;
744
745         LASSERT(!fid_is_zero(pfid));
746         parent = dt_locate(env, emd->emd_bottom, pfid);
747         if (unlikely(IS_ERR(parent)))
748                 RETURN(PTR_ERR(parent));
749
750         /* create local file with @fid */
751         dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
752                                                  parent, name, mode);
753         if (IS_ERR(dto))
754                 GOTO(out_put, rc = PTR_ERR(dto));
755
756         *fid = *lu_object_fid(&dto->do_lu);
757         /* since stack is not fully set up the local_storage uses own stack
758          * and we should drop its object from cache */
759         lu_object_put_nocache(env, &dto->do_lu);
760
761         EXIT;
762 out_put:
763         lu_object_put(env, &parent->do_lu);
764         RETURN(rc);
765 }
766
767 static int
768 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
769                  struct echo_device *ed)
770 {
771         struct lu_fid                    fid;
772         int                              rc = 0;
773         ENTRY;
774
775         /* Setup local dirs */
776         fid.f_seq = FID_SEQ_LOCAL_NAME;
777         fid.f_oid = 1;
778         fid.f_ver = 0;
779         rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
780         if (rc != 0)
781                 RETURN(rc);
782
783         lu_echo_root_fid(&fid);
784         if (echo_md_seq_site(emd)->ss_node_id == 0) {
785                 rc = echo_md_local_file_create(env, emd, ed->ed_los,
786                                                &emd->emd_local_root_fid,
787                                                echo_md_root_dir_name, S_IFDIR |
788                                                S_IRUGO | S_IWUSR | S_IXUGO,
789                                                &fid);
790                 if (rc != 0) {
791                         CERROR("%s: create md echo root fid failed: rc = %d\n",
792                                emd2obd_dev(emd)->obd_name, rc);
793                         GOTO(out_los, rc);
794                 }
795         }
796         ed->ed_root_fid = fid;
797
798         RETURN(0);
799 out_los:
800         echo_ed_los_fini(env, ed);
801
802         RETURN(rc);
803 }
804 #endif /* HAVE_SERVER_SUPPORT */
805
806 static struct lu_device *echo_device_alloc(const struct lu_env *env,
807                                            struct lu_device_type *t,
808                                            struct lustre_cfg *cfg)
809 {
810         struct lu_device   *next;
811         struct echo_device *ed;
812         struct cl_device   *cd;
813         struct obd_device  *obd = NULL; /* to keep compiler happy */
814         struct obd_device  *tgt;
815         const char *tgt_type_name;
816         int rc;
817         int cleanup = 0;
818         ENTRY;
819
820         OBD_ALLOC_PTR(ed);
821         if (ed == NULL)
822                 GOTO(out, rc = -ENOMEM);
823
824         cleanup = 1;
825         cd = &ed->ed_cl;
826         rc = cl_device_init(cd, t);
827         if (rc)
828                 GOTO(out, rc);
829
830         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
831
832         cleanup = 2;
833         obd = class_name2obd(lustre_cfg_string(cfg, 0));
834         LASSERT(obd != NULL);
835         LASSERT(env != NULL);
836
837         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
838         if (tgt == NULL) {
839                 CERROR("Can not find tgt device %s\n",
840                         lustre_cfg_string(cfg, 1));
841                 GOTO(out, rc = -ENODEV);
842         }
843
844         next = tgt->obd_lu_dev;
845
846         if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
847                 ed->ed_next_ismd = 1;
848         } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
849                    strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
850                 ed->ed_next_ismd = 0;
851                 rc = echo_site_init(env, ed);
852                 if (rc)
853                         GOTO(out, rc);
854         } else {
855                 GOTO(out, rc = -EINVAL);
856         }
857
858         cleanup = 3;
859
860         rc = echo_client_setup(env, obd, cfg);
861         if (rc)
862                 GOTO(out, rc);
863
864         ed->ed_ec = &obd->u.echo_client;
865         cleanup = 4;
866
867         if (ed->ed_next_ismd) {
868 #ifdef HAVE_SERVER_SUPPORT
869                 /* Suppose to connect to some Metadata layer */
870                 struct lu_site          *ls = NULL;
871                 struct lu_device        *ld = NULL;
872                 struct md_device        *md = NULL;
873                 struct echo_md_device   *emd = NULL;
874                 int                      found = 0;
875
876                 if (next == NULL) {
877                         CERROR("%s is not lu device type!\n",
878                                lustre_cfg_string(cfg, 1));
879                         GOTO(out, rc = -EINVAL);
880                 }
881
882                 tgt_type_name = lustre_cfg_string(cfg, 2);
883                 if (!tgt_type_name) {
884                         CERROR("%s no type name for echo %s setup\n",
885                                 lustre_cfg_string(cfg, 1),
886                                 tgt->obd_type->typ_name);
887                         GOTO(out, rc = -EINVAL);
888                 }
889
890                 ls = next->ld_site;
891
892                 spin_lock(&ls->ls_ld_lock);
893                 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
894                         if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
895                                 found = 1;
896                                 break;
897                         }
898                 }
899                 spin_unlock(&ls->ls_ld_lock);
900
901                 if (found == 0) {
902                         CERROR("%s is not lu device type!\n",
903                                lustre_cfg_string(cfg, 1));
904                         GOTO(out, rc = -EINVAL);
905                 }
906
907                 next = ld;
908                 /* For MD echo client, it will use the site in MDS stack */
909                 ed->ed_site = ls;
910                 ed->ed_cl.cd_lu_dev.ld_site = ls;
911                 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
912                 if (rc) {
913                         CERROR("echo fid init error %d\n", rc);
914                         GOTO(out, rc);
915                 }
916
917                 md = lu2md_dev(next);
918                 emd = lu2emd_dev(&md->md_lu_dev);
919                 rc = echo_md_root_get(env, emd, ed);
920                 if (rc != 0) {
921                         CERROR("%s: get root error: rc = %d\n",
922                                 emd2obd_dev(emd)->obd_name, rc);
923                         GOTO(out, rc);
924                 }
925 #else /* !HAVE_SERVER_SUPPORT */
926                 CERROR("Local operations are NOT supported on client side. "
927                        "Only remote operations are supported. Metadata client "
928                        "must be run on server side.\n");
929                 GOTO(out, rc = -EOPNOTSUPP);
930 #endif /* HAVE_SERVER_SUPPORT */
931         } else {
932                  /* if echo client is to be stacked upon ost device, the next is
933                   * NULL since ost is not a clio device so far */
934                 if (next != NULL && !lu_device_is_cl(next))
935                         next = NULL;
936
937                 tgt_type_name = tgt->obd_type->typ_name;
938                 if (next != NULL) {
939                         LASSERT(next != NULL);
940                         if (next->ld_site != NULL)
941                                 GOTO(out, rc = -EBUSY);
942
943                         next->ld_site = ed->ed_site;
944                         rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
945                                                      next->ld_type->ldt_name,
946                                                      NULL);
947                         if (rc)
948                                 GOTO(out, rc);
949                 } else
950                         LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
951         }
952
953         ed->ed_next = next;
954         RETURN(&cd->cd_lu_dev);
955 out:
956         switch(cleanup) {
957         case 4: {
958                 int rc2;
959                 rc2 = echo_client_cleanup(obd);
960                 if (rc2)
961                         CERROR("Cleanup obd device %s error(%d)\n",
962                                obd->obd_name, rc2);
963         }
964
965         case 3:
966                 echo_site_fini(env, ed);
967         case 2:
968                 cl_device_fini(&ed->ed_cl);
969         case 1:
970                 OBD_FREE_PTR(ed);
971         case 0:
972         default:
973                 break;
974         }
975         return(ERR_PTR(rc));
976 }
977
978 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
979                           const char *name, struct lu_device *next)
980 {
981         LBUG();
982         return 0;
983 }
984
985 static struct lu_device *echo_device_fini(const struct lu_env *env,
986                                           struct lu_device *d)
987 {
988         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
989         struct lu_device *next = ed->ed_next;
990
991         while (next && !ed->ed_next_ismd)
992                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
993         return NULL;
994 }
995
996 static void echo_lock_release(const struct lu_env *env,
997                               struct echo_lock *ecl,
998                               int still_used)
999 {
1000         struct cl_lock *clk = echo_lock2cl(ecl);
1001
1002         cl_lock_release(env, clk);
1003 }
1004
1005 static struct lu_device *echo_device_free(const struct lu_env *env,
1006                                           struct lu_device *d)
1007 {
1008         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
1009         struct echo_client_obd *ec   = ed->ed_ec;
1010         struct echo_object     *eco;
1011         struct lu_device       *next = ed->ed_next;
1012
1013         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
1014                ed, next);
1015
1016         lu_site_purge(env, ed->ed_site, -1);
1017
1018         /* check if there are objects still alive.
1019          * It shouldn't have any object because lu_site_purge would cleanup
1020          * all of cached objects. Anyway, probably the echo device is being
1021          * parallelly accessed.
1022          */
1023         spin_lock(&ec->ec_lock);
1024         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
1025                 eco->eo_deleted = 1;
1026         spin_unlock(&ec->ec_lock);
1027
1028         /* purge again */
1029         lu_site_purge(env, ed->ed_site, -1);
1030
1031         CDEBUG(D_INFO,
1032                "Waiting for the reference of echo object to be dropped\n");
1033
1034         /* Wait for the last reference to be dropped. */
1035         spin_lock(&ec->ec_lock);
1036         while (!list_empty(&ec->ec_objects)) {
1037                 spin_unlock(&ec->ec_lock);
1038                 CERROR("echo_client still has objects at cleanup time, "
1039                        "wait for 1 second\n");
1040                 set_current_state(TASK_UNINTERRUPTIBLE);
1041                 schedule_timeout(cfs_time_seconds(1));
1042                 lu_site_purge(env, ed->ed_site, -1);
1043                 spin_lock(&ec->ec_lock);
1044         }
1045         spin_unlock(&ec->ec_lock);
1046
1047         LASSERT(list_empty(&ec->ec_locks));
1048
1049         CDEBUG(D_INFO, "No object exists, exiting...\n");
1050
1051         echo_client_cleanup(d->ld_obd);
1052 #ifdef HAVE_SERVER_SUPPORT
1053         echo_fid_fini(d->ld_obd);
1054         echo_ed_los_fini(env, ed);
1055 #endif
1056         while (next && !ed->ed_next_ismd)
1057                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1058
1059         LASSERT(ed->ed_site == d->ld_site);
1060         echo_site_fini(env, ed);
1061         cl_device_fini(&ed->ed_cl);
1062         OBD_FREE_PTR(ed);
1063
1064         cl_env_cache_purge(~0);
1065
1066         return NULL;
1067 }
1068
1069 static const struct lu_device_type_operations echo_device_type_ops = {
1070         .ldto_init = echo_type_init,
1071         .ldto_fini = echo_type_fini,
1072
1073         .ldto_start = echo_type_start,
1074         .ldto_stop  = echo_type_stop,
1075
1076         .ldto_device_alloc = echo_device_alloc,
1077         .ldto_device_free  = echo_device_free,
1078         .ldto_device_init  = echo_device_init,
1079         .ldto_device_fini  = echo_device_fini
1080 };
1081
1082 static struct lu_device_type echo_device_type = {
1083         .ldt_tags     = LU_DEVICE_CL,
1084         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
1085         .ldt_ops      = &echo_device_type_ops,
1086         .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1087 };
1088 /** @} echo_init */
1089
1090 /** \defgroup echo_exports Exported operations
1091  *
1092  * exporting functions to echo client
1093  *
1094  * @{
1095  */
1096
1097 /* Interfaces to echo client obd device */
1098 static struct echo_object *
1099 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
1100 {
1101         struct lu_env *env;
1102         struct echo_thread_info *info;
1103         struct echo_object_conf *conf;
1104         struct echo_object *eco;
1105         struct cl_object *obj;
1106         struct lov_oinfo *oinfo = NULL;
1107         struct lu_fid *fid;
1108         __u16  refcheck;
1109         int rc;
1110         ENTRY;
1111
1112         LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
1113         LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
1114
1115         /* Never return an object if the obd is to be freed. */
1116         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1117                 RETURN(ERR_PTR(-ENODEV));
1118
1119         env = cl_env_get(&refcheck);
1120         if (IS_ERR(env))
1121                 RETURN((void *)env);
1122
1123         info = echo_env_info(env);
1124         conf = &info->eti_conf;
1125         if (d->ed_next) {
1126                 OBD_ALLOC_PTR(oinfo);
1127                 if (oinfo == NULL)
1128                         GOTO(out, eco = ERR_PTR(-ENOMEM));
1129
1130                 oinfo->loi_oi = *oi;
1131                 conf->eoc_cl.u.coc_oinfo = oinfo;
1132         }
1133
1134         /* If echo_object_init() is successful then ownership of oinfo
1135          * is transferred to the object. */
1136         conf->eoc_oinfo = &oinfo;
1137
1138         fid = &info->eti_fid;
1139         rc = ostid_to_fid(fid, oi, 0);
1140         if (rc != 0)
1141                 GOTO(out, eco = ERR_PTR(rc));
1142
1143         /* In the function below, .hs_keycmp resolves to
1144          * lu_obj_hop_keycmp() */
1145         /* coverity[overrun-buffer-val] */
1146         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1147         if (IS_ERR(obj))
1148                 GOTO(out, eco = (void*)obj);
1149
1150         eco = cl2echo_obj(obj);
1151         if (eco->eo_deleted) {
1152                 cl_object_put(env, obj);
1153                 eco = ERR_PTR(-EAGAIN);
1154         }
1155
1156 out:
1157         if (oinfo != NULL)
1158                 OBD_FREE_PTR(oinfo);
1159
1160         cl_env_put(env, &refcheck);
1161         RETURN(eco);
1162 }
1163
1164 static int cl_echo_object_put(struct echo_object *eco)
1165 {
1166         struct lu_env *env;
1167         struct cl_object *obj = echo_obj2cl(eco);
1168         __u16  refcheck;
1169         ENTRY;
1170
1171         env = cl_env_get(&refcheck);
1172         if (IS_ERR(env))
1173                 RETURN(PTR_ERR(env));
1174
1175         /* an external function to kill an object? */
1176         if (eco->eo_deleted) {
1177                 struct lu_object_header *loh = obj->co_lu.lo_header;
1178                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1179                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1180         }
1181
1182         cl_object_put(env, obj);
1183         cl_env_put(env, &refcheck);
1184         RETURN(0);
1185 }
1186
1187 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1188                             u64 start, u64 end, int mode,
1189                             __u64 *cookie , __u32 enqflags)
1190 {
1191         struct cl_io *io;
1192         struct cl_lock *lck;
1193         struct cl_object *obj;
1194         struct cl_lock_descr *descr;
1195         struct echo_thread_info *info;
1196         int rc = -ENOMEM;
1197         ENTRY;
1198
1199         info = echo_env_info(env);
1200         io = &info->eti_io;
1201         lck = &info->eti_lock;
1202         obj = echo_obj2cl(eco);
1203
1204         memset(lck, 0, sizeof(*lck));
1205         descr = &lck->cll_descr;
1206         descr->cld_obj   = obj;
1207         descr->cld_start = cl_index(obj, start);
1208         descr->cld_end   = cl_index(obj, end);
1209         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1210         descr->cld_enq_flags = enqflags;
1211         io->ci_obj = obj;
1212
1213         rc = cl_lock_request(env, io, lck);
1214         if (rc == 0) {
1215                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1216                 struct echo_lock *el;
1217
1218                 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1219                 spin_lock(&ec->ec_lock);
1220                 if (list_empty(&el->el_chain)) {
1221                         list_add(&el->el_chain, &ec->ec_locks);
1222                         el->el_cookie = ++ec->ec_unique;
1223                 }
1224                 atomic_inc(&el->el_refcount);
1225                 *cookie = el->el_cookie;
1226                 spin_unlock(&ec->ec_lock);
1227         }
1228         RETURN(rc);
1229 }
1230
1231 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1232                            __u64 cookie)
1233 {
1234         struct echo_client_obd *ec = ed->ed_ec;
1235         struct echo_lock       *ecl = NULL;
1236         struct list_head        *el;
1237         int found = 0, still_used = 0;
1238         ENTRY;
1239
1240         LASSERT(ec != NULL);
1241         spin_lock(&ec->ec_lock);
1242         list_for_each(el, &ec->ec_locks) {
1243                 ecl = list_entry(el, struct echo_lock, el_chain);
1244                 CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
1245                 found = (ecl->el_cookie == cookie);
1246                 if (found) {
1247                         if (atomic_dec_and_test(&ecl->el_refcount))
1248                                 list_del_init(&ecl->el_chain);
1249                         else
1250                                 still_used = 1;
1251                         break;
1252                 }
1253         }
1254         spin_unlock(&ec->ec_lock);
1255
1256         if (!found)
1257                 RETURN(-ENOENT);
1258
1259         echo_lock_release(env, ecl, still_used);
1260         RETURN(0);
1261 }
1262
1263 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1264                                 struct cl_page *page)
1265 {
1266         struct echo_thread_info *info;
1267         struct cl_2queue        *queue;
1268
1269         info = echo_env_info(env);
1270         LASSERT(io == &info->eti_io);
1271
1272         queue = &info->eti_queue;
1273         cl_page_list_add(&queue->c2_qout, page);
1274 }
1275
1276 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1277                               struct page **pages, int npages, int async)
1278 {
1279         struct lu_env           *env;
1280         struct echo_thread_info *info;
1281         struct cl_object        *obj = echo_obj2cl(eco);
1282         struct echo_device      *ed  = eco->eo_dev;
1283         struct cl_2queue        *queue;
1284         struct cl_io            *io;
1285         struct cl_page          *clp;
1286         struct lustre_handle    lh = { 0 };
1287         int page_size = cl_page_size(obj);
1288         int rc;
1289         int i;
1290         __u16 refcheck;
1291         ENTRY;
1292
1293         LASSERT((offset & ~PAGE_MASK) == 0);
1294         LASSERT(ed->ed_next != NULL);
1295         env = cl_env_get(&refcheck);
1296         if (IS_ERR(env))
1297                 RETURN(PTR_ERR(env));
1298
1299         info    = echo_env_info(env);
1300         io      = &info->eti_io;
1301         queue   = &info->eti_queue;
1302
1303         cl_2queue_init(queue);
1304
1305         io->ci_ignore_layout = 1;
1306         rc = cl_io_init(env, io, CIT_MISC, obj);
1307         if (rc < 0)
1308                 GOTO(out, rc);
1309         LASSERT(rc == 0);
1310
1311
1312         rc = cl_echo_enqueue0(env, eco, offset,
1313                               offset + npages * PAGE_CACHE_SIZE - 1,
1314                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1315                               CEF_NEVER);
1316         if (rc < 0)
1317                 GOTO(error_lock, rc);
1318
1319         for (i = 0; i < npages; i++) {
1320                 LASSERT(pages[i]);
1321                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1322                                    pages[i], CPT_TRANSIENT);
1323                 if (IS_ERR(clp)) {
1324                         rc = PTR_ERR(clp);
1325                         break;
1326                 }
1327                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1328
1329                 rc = cl_page_own(env, io, clp);
1330                 if (rc) {
1331                         LASSERT(clp->cp_state == CPS_FREEING);
1332                         cl_page_put(env, clp);
1333                         break;
1334                 }
1335
1336                 cl_2queue_add(queue, clp);
1337
1338                 /* drop the reference count for cl_page_find, so that the page
1339                  * will be freed in cl_2queue_fini. */
1340                 cl_page_put(env, clp);
1341                 cl_page_clip(env, clp, 0, page_size);
1342
1343                 offset += page_size;
1344         }
1345
1346         if (rc == 0) {
1347                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1348
1349                 async = async && (typ == CRT_WRITE);
1350                 if (async)
1351                         rc = cl_io_commit_async(env, io, &queue->c2_qin,
1352                                                 0, PAGE_SIZE,
1353                                                 echo_commit_callback);
1354                 else
1355                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1356                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1357                        async ? "async" : "sync", rc);
1358         }
1359
1360         cl_echo_cancel0(env, ed, lh.cookie);
1361         EXIT;
1362 error_lock:
1363         cl_2queue_discard(env, io, queue);
1364         cl_2queue_disown(env, io, queue);
1365         cl_2queue_fini(env, queue);
1366         cl_io_fini(env, io);
1367 out:
1368         cl_env_put(env, &refcheck);
1369         return rc;
1370 }
1371 /** @} echo_exports */
1372
1373
1374 static u64 last_object_id;
1375
1376 #ifdef HAVE_SERVER_SUPPORT
1377 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1378                                       __u64 id)
1379 {
1380         sprintf(name, LPU64, id);
1381         lname->ln_name = name;
1382         lname->ln_namelen = strlen(name);
1383 }
1384
1385 /* similar to mdt_attr_get_complex */
1386 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1387                             struct md_attr *ma)
1388 {
1389         struct echo_thread_info *info = echo_env_info(env);
1390         int                      rc;
1391
1392         ENTRY;
1393
1394         LASSERT(ma->ma_lmm_size > 0);
1395
1396         rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1397         if (rc < 0)
1398                 RETURN(rc);
1399
1400         /* big_lmm may need to be grown */
1401         if (info->eti_big_lmmsize < rc) {
1402                 int size = size_roundup_power2(rc);
1403
1404                 if (info->eti_big_lmmsize > 0) {
1405                         /* free old buffer */
1406                         LASSERT(info->eti_big_lmm);
1407                         OBD_FREE_LARGE(info->eti_big_lmm,
1408                                        info->eti_big_lmmsize);
1409                         info->eti_big_lmm = NULL;
1410                         info->eti_big_lmmsize = 0;
1411                 }
1412
1413                 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1414                 if (info->eti_big_lmm == NULL)
1415                         RETURN(-ENOMEM);
1416                 info->eti_big_lmmsize = size;
1417         }
1418         LASSERT(info->eti_big_lmmsize >= rc);
1419
1420         info->eti_buf.lb_buf = info->eti_big_lmm;
1421         info->eti_buf.lb_len = info->eti_big_lmmsize;
1422         rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1423         if (rc < 0)
1424                 RETURN(rc);
1425
1426         ma->ma_valid |= MA_LOV;
1427         ma->ma_lmm = info->eti_big_lmm;
1428         ma->ma_lmm_size = rc;
1429
1430         RETURN(0);
1431 }
1432
1433 static int echo_attr_get_complex(const struct lu_env *env,
1434                                  struct md_object *next,
1435                                  struct md_attr *ma)
1436 {
1437         struct echo_thread_info *info = echo_env_info(env);
1438         struct lu_buf           *buf = &info->eti_buf;
1439         umode_t          mode = lu_object_attr(&next->mo_lu);
1440         int                      need = ma->ma_need;
1441         int                      rc = 0, rc2;
1442
1443         ENTRY;
1444
1445         ma->ma_valid = 0;
1446
1447         if (need & MA_INODE) {
1448                 ma->ma_need = MA_INODE;
1449                 rc = mo_attr_get(env, next, ma);
1450                 if (rc)
1451                         GOTO(out, rc);
1452                 ma->ma_valid |= MA_INODE;
1453         }
1454
1455         if (need & MA_LOV) {
1456                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1457                         LASSERT(ma->ma_lmm_size > 0);
1458                         buf->lb_buf = ma->ma_lmm;
1459                         buf->lb_len = ma->ma_lmm_size;
1460                         rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1461                         if (rc2 > 0) {
1462                                 ma->ma_lmm_size = rc2;
1463                                 ma->ma_valid |= MA_LOV;
1464                         } else if (rc2 == -ENODATA) {
1465                                 /* no LOV EA */
1466                                 ma->ma_lmm_size = 0;
1467                         } else if (rc2 == -ERANGE) {
1468                                 rc2 = echo_big_lmm_get(env, next, ma);
1469                                 if (rc2 < 0)
1470                                         GOTO(out, rc = rc2);
1471                         } else {
1472                                 GOTO(out, rc = rc2);
1473                         }
1474                 }
1475         }
1476
1477 #ifdef CONFIG_FS_POSIX_ACL
1478         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1479                 buf->lb_buf = ma->ma_acl;
1480                 buf->lb_len = ma->ma_acl_size;
1481                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1482                 if (rc2 > 0) {
1483                         ma->ma_acl_size = rc2;
1484                         ma->ma_valid |= MA_ACL_DEF;
1485                 } else if (rc2 == -ENODATA) {
1486                         /* no ACLs */
1487                         ma->ma_acl_size = 0;
1488                 } else {
1489                         GOTO(out, rc = rc2);
1490                 }
1491         }
1492 #endif
1493 out:
1494         ma->ma_need = need;
1495         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
1496                rc, ma->ma_valid, ma->ma_lmm);
1497         RETURN(rc);
1498 }
1499
1500 static int
1501 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1502                         struct md_object *parent, struct lu_fid *fid,
1503                         struct lu_name *lname, struct md_op_spec *spec,
1504                         struct md_attr *ma)
1505 {
1506         struct lu_object        *ec_child, *child;
1507         struct lu_device        *ld = ed->ed_next;
1508         struct echo_thread_info *info = echo_env_info(env);
1509         struct lu_fid           *fid2 = &info->eti_fid2;
1510         struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
1511         int                      rc;
1512
1513         ENTRY;
1514
1515         rc = mdo_lookup(env, parent, lname, fid2, spec);
1516         if (rc == 0)
1517                 return -EEXIST;
1518         else if (rc != -ENOENT)
1519                 return rc;
1520
1521         ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1522                                      fid, &conf);
1523         if (IS_ERR(ec_child)) {
1524                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1525                         PTR_ERR(ec_child));
1526                 RETURN(PTR_ERR(ec_child));
1527         }
1528
1529         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1530         if (child == NULL) {
1531                 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1532                 GOTO(out_put, rc = -EINVAL);
1533         }
1534
1535         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1536                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1537
1538         /*
1539          * Do not perform lookup sanity check. We know that name does not exist.
1540          */
1541         spec->sp_cr_lookup = 0;
1542         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1543         if (rc) {
1544                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1545                 GOTO(out_put, rc);
1546         }
1547         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
1548                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1549         EXIT;
1550 out_put:
1551         lu_object_put(env, ec_child);
1552         return rc;
1553 }
1554
1555 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1556                              struct md_attr *ma)
1557 {
1558         struct echo_thread_info *info = echo_env_info(env);
1559
1560         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1561                 ma->ma_lmm = (void *)&info->eti_lmm;
1562                 ma->ma_lmm_size = sizeof(info->eti_lmm);
1563         } else {
1564                 LASSERT(info->eti_big_lmmsize);
1565                 ma->ma_lmm = info->eti_big_lmm;
1566                 ma->ma_lmm_size = info->eti_big_lmmsize;
1567         }
1568
1569         return 0;
1570 }
1571
1572 static int echo_create_md_object(const struct lu_env *env,
1573                                  struct echo_device *ed,
1574                                  struct lu_object *ec_parent,
1575                                  struct lu_fid *fid,
1576                                  char *name, int namelen,
1577                                  __u64 id, __u32 mode, int count,
1578                                  int stripe_count, int stripe_offset)
1579 {
1580         struct lu_object        *parent;
1581         struct echo_thread_info *info = echo_env_info(env);
1582         struct lu_name          *lname = &info->eti_lname;
1583         struct md_op_spec       *spec = &info->eti_spec;
1584         struct md_attr          *ma = &info->eti_ma;
1585         struct lu_device        *ld = ed->ed_next;
1586         int                      rc = 0;
1587         int                      i;
1588
1589         ENTRY;
1590
1591         if (ec_parent == NULL)
1592                 return -1;
1593         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1594         if (parent == NULL)
1595                 RETURN(-ENXIO);
1596
1597         memset(ma, 0, sizeof(*ma));
1598         memset(spec, 0, sizeof(*spec));
1599         if (stripe_count != 0) {
1600                 spec->sp_cr_flags |= FMODE_WRITE;
1601                 echo_set_lmm_size(env, ld, ma);
1602                 if (stripe_count != -1) {
1603                         struct lov_user_md_v3 *lum = &info->eti_lum;
1604
1605                         lum->lmm_magic = LOV_USER_MAGIC_V3;
1606                         lum->lmm_stripe_count = stripe_count;
1607                         lum->lmm_stripe_offset = stripe_offset;
1608                         lum->lmm_pattern = 0;
1609                         spec->u.sp_ea.eadata = lum;
1610                         spec->u.sp_ea.eadatalen = sizeof(*lum);
1611                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1612                 }
1613         }
1614
1615         ma->ma_attr.la_mode = mode;
1616         ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1617         ma->ma_attr.la_ctime = cfs_time_current_64();
1618
1619         if (name != NULL) {
1620                 lname->ln_name = name;
1621                 lname->ln_namelen = namelen;
1622                 /* If name is specified, only create one object by name */
1623                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1624                                              spec, ma);
1625                 RETURN(rc);
1626         }
1627
1628         /* Create multiple object sequenced by id */
1629         for (i = 0; i < count; i++) {
1630                 char *tmp_name = info->eti_name;
1631
1632                 echo_md_build_name(lname, tmp_name, id);
1633
1634                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1635                                              spec, ma);
1636                 if (rc) {
1637                         CERROR("Can not create child %s: rc = %d\n", tmp_name,
1638                                 rc);
1639                         break;
1640                 }
1641                 id++;
1642                 fid->f_oid++;
1643         }
1644
1645         RETURN(rc);
1646 }
1647
1648 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1649                                         struct echo_device *ed,
1650                                         struct md_object *parent,
1651                                         struct lu_name *lname)
1652 {
1653         struct echo_thread_info *info = echo_env_info(env);
1654         struct lu_fid           *fid = &info->eti_fid;
1655         struct lu_object        *child;
1656         int    rc;
1657         ENTRY;
1658
1659         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1660                PFID(fid), parent);
1661         rc = mdo_lookup(env, parent, lname, fid, NULL);
1662         if (rc) {
1663                 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1664                 RETURN(ERR_PTR(rc));
1665         }
1666
1667         /* In the function below, .hs_keycmp resolves to
1668          * lu_obj_hop_keycmp() */
1669         /* coverity[overrun-buffer-val] */
1670         child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1671
1672         RETURN(child);
1673 }
1674
1675 static int echo_setattr_object(const struct lu_env *env,
1676                                struct echo_device *ed,
1677                                struct lu_object *ec_parent,
1678                                __u64 id, int count)
1679 {
1680         struct lu_object        *parent;
1681         struct echo_thread_info *info = echo_env_info(env);
1682         struct lu_name          *lname = &info->eti_lname;
1683         char                    *name = info->eti_name;
1684         struct lu_device        *ld = ed->ed_next;
1685         struct lu_buf           *buf = &info->eti_buf;
1686         int                      rc = 0;
1687         int                      i;
1688
1689         ENTRY;
1690
1691         if (ec_parent == NULL)
1692                 return -1;
1693         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1694         if (parent == NULL)
1695                 RETURN(-ENXIO);
1696
1697         for (i = 0; i < count; i++) {
1698                 struct lu_object *ec_child, *child;
1699
1700                 echo_md_build_name(lname, name, id);
1701
1702                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1703                 if (IS_ERR(ec_child)) {
1704                         CERROR("Can't find child %s: rc = %ld\n",
1705                                 lname->ln_name, PTR_ERR(ec_child));
1706                         RETURN(PTR_ERR(ec_child));
1707                 }
1708
1709                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1710                 if (child == NULL) {
1711                         CERROR("Can not locate the child %s\n", lname->ln_name);
1712                         lu_object_put(env, ec_child);
1713                         rc = -EINVAL;
1714                         break;
1715                 }
1716
1717                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1718                        PFID(lu_object_fid(child)));
1719
1720                 buf->lb_buf = info->eti_xattr_buf;
1721                 buf->lb_len = sizeof(info->eti_xattr_buf);
1722
1723                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1724                 rc = mo_xattr_set(env, lu2md(child), buf, name,
1725                                   LU_XATTR_CREATE);
1726                 if (rc < 0) {
1727                         CERROR("Can not setattr child "DFID": rc = %d\n",
1728                                 PFID(lu_object_fid(child)), rc);
1729                         lu_object_put(env, ec_child);
1730                         break;
1731                 }
1732                 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1733                        PFID(lu_object_fid(child)));
1734                 id++;
1735                 lu_object_put(env, ec_child);
1736         }
1737         RETURN(rc);
1738 }
1739
1740 static int echo_getattr_object(const struct lu_env *env,
1741                                struct echo_device *ed,
1742                                struct lu_object *ec_parent,
1743                                __u64 id, int count)
1744 {
1745         struct lu_object        *parent;
1746         struct echo_thread_info *info = echo_env_info(env);
1747         struct lu_name          *lname = &info->eti_lname;
1748         char                    *name = info->eti_name;
1749         struct md_attr          *ma = &info->eti_ma;
1750         struct lu_device        *ld = ed->ed_next;
1751         int                      rc = 0;
1752         int                      i;
1753
1754         ENTRY;
1755
1756         if (ec_parent == NULL)
1757                 return -1;
1758         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1759         if (parent == NULL)
1760                 RETURN(-ENXIO);
1761
1762         memset(ma, 0, sizeof(*ma));
1763         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1764         ma->ma_acl = info->eti_xattr_buf;
1765         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1766
1767         for (i = 0; i < count; i++) {
1768                 struct lu_object *ec_child, *child;
1769
1770                 ma->ma_valid = 0;
1771                 echo_md_build_name(lname, name, id);
1772                 echo_set_lmm_size(env, ld, ma);
1773
1774                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1775                 if (IS_ERR(ec_child)) {
1776                         CERROR("Can't find child %s: rc = %ld\n",
1777                                lname->ln_name, PTR_ERR(ec_child));
1778                         RETURN(PTR_ERR(ec_child));
1779                 }
1780
1781                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1782                 if (child == NULL) {
1783                         CERROR("Can not locate the child %s\n", lname->ln_name);
1784                         lu_object_put(env, ec_child);
1785                         RETURN(-EINVAL);
1786                 }
1787
1788                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1789                        PFID(lu_object_fid(child)));
1790                 rc = echo_attr_get_complex(env, lu2md(child), ma);
1791                 if (rc) {
1792                         CERROR("Can not getattr child "DFID": rc = %d\n",
1793                                 PFID(lu_object_fid(child)), rc);
1794                         lu_object_put(env, ec_child);
1795                         break;
1796                 }
1797                 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1798                        PFID(lu_object_fid(child)));
1799                 id++;
1800                 lu_object_put(env, ec_child);
1801         }
1802
1803         RETURN(rc);
1804 }
1805
1806 static int echo_lookup_object(const struct lu_env *env,
1807                               struct echo_device *ed,
1808                               struct lu_object *ec_parent,
1809                               __u64 id, int count)
1810 {
1811         struct lu_object        *parent;
1812         struct echo_thread_info *info = echo_env_info(env);
1813         struct lu_name          *lname = &info->eti_lname;
1814         char                    *name = info->eti_name;
1815         struct lu_fid           *fid = &info->eti_fid;
1816         struct lu_device        *ld = ed->ed_next;
1817         int                      rc = 0;
1818         int                      i;
1819
1820         if (ec_parent == NULL)
1821                 return -1;
1822         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1823         if (parent == NULL)
1824                 return -ENXIO;
1825
1826         /*prepare the requests*/
1827         for (i = 0; i < count; i++) {
1828                 echo_md_build_name(lname, name, id);
1829
1830                 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1831                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1832
1833                 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1834                 if (rc) {
1835                         CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1836                         break;
1837                 }
1838                 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1839                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1840
1841                 id++;
1842         }
1843         return rc;
1844 }
1845
1846 static int echo_md_destroy_internal(const struct lu_env *env,
1847                                     struct echo_device *ed,
1848                                     struct md_object *parent,
1849                                     struct lu_name *lname,
1850                                     struct md_attr *ma)
1851 {
1852         struct lu_device   *ld = ed->ed_next;
1853         struct lu_object   *ec_child;
1854         struct lu_object   *child;
1855         int                 rc;
1856
1857         ENTRY;
1858
1859         ec_child = echo_md_lookup(env, ed, parent, lname);
1860         if (IS_ERR(ec_child)) {
1861                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1862                         PTR_ERR(ec_child));
1863                 RETURN(PTR_ERR(ec_child));
1864         }
1865
1866         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1867         if (child == NULL) {
1868                 CERROR("Can not locate the child %s\n", lname->ln_name);
1869                 GOTO(out_put, rc = -EINVAL);
1870         }
1871
1872         if (lu_object_remote(child)) {
1873                 CERROR("Can not destroy remote object %s: rc = %d\n",
1874                        lname->ln_name, -EPERM);
1875                 GOTO(out_put, rc = -EPERM);
1876         }
1877         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1878                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1879
1880         rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1881         if (rc) {
1882                 CERROR("Can not unlink child %s: rc = %d\n",
1883                         lname->ln_name, rc);
1884                 GOTO(out_put, rc);
1885         }
1886         CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1887                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1888 out_put:
1889         lu_object_put(env, ec_child);
1890         return rc;
1891 }
1892
1893 static int echo_destroy_object(const struct lu_env *env,
1894                                struct echo_device *ed,
1895                                struct lu_object *ec_parent,
1896                                char *name, int namelen,
1897                                __u64 id, __u32 mode,
1898                                int count)
1899 {
1900         struct echo_thread_info *info = echo_env_info(env);
1901         struct lu_name          *lname = &info->eti_lname;
1902         struct md_attr          *ma = &info->eti_ma;
1903         struct lu_device        *ld = ed->ed_next;
1904         struct lu_object        *parent;
1905         int                      rc = 0;
1906         int                      i;
1907         ENTRY;
1908
1909         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1910         if (parent == NULL)
1911                 RETURN(-EINVAL);
1912
1913         memset(ma, 0, sizeof(*ma));
1914         ma->ma_attr.la_mode = mode;
1915         ma->ma_attr.la_valid = LA_CTIME;
1916         ma->ma_attr.la_ctime = cfs_time_current_64();
1917         ma->ma_need = MA_INODE;
1918         ma->ma_valid = 0;
1919
1920         if (name != NULL) {
1921                 lname->ln_name = name;
1922                 lname->ln_namelen = namelen;
1923                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1924                                               ma);
1925                 RETURN(rc);
1926         }
1927
1928         /*prepare the requests*/
1929         for (i = 0; i < count; i++) {
1930                 char *tmp_name = info->eti_name;
1931
1932                 ma->ma_valid = 0;
1933                 echo_md_build_name(lname, tmp_name, id);
1934
1935                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1936                                               ma);
1937                 if (rc) {
1938                         CERROR("Can not unlink child %s: rc = %d\n", name, rc);
1939                         break;
1940                 }
1941                 id++;
1942         }
1943
1944         RETURN(rc);
1945 }
1946
1947 static struct lu_object *echo_resolve_path(const struct lu_env *env,
1948                                            struct echo_device *ed, char *path,
1949                                            int path_len)
1950 {
1951         struct lu_device        *ld = ed->ed_next;
1952         struct echo_thread_info *info = echo_env_info(env);
1953         struct lu_fid           *fid = &info->eti_fid;
1954         struct lu_name          *lname = &info->eti_lname;
1955         struct lu_object        *parent = NULL;
1956         struct lu_object        *child = NULL;
1957         int                      rc = 0;
1958         ENTRY;
1959
1960         *fid = ed->ed_root_fid;
1961
1962         /* In the function below, .hs_keycmp resolves to
1963          * lu_obj_hop_keycmp() */
1964         /* coverity[overrun-buffer-val] */
1965         parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1966         if (IS_ERR(parent)) {
1967                 CERROR("Can not find the parent "DFID": rc = %ld\n",
1968                         PFID(fid), PTR_ERR(parent));
1969                 RETURN(parent);
1970         }
1971
1972         while (1) {
1973                 struct lu_object *ld_parent;
1974                 char *e;
1975
1976                 e = strsep(&path, "/");
1977                 if (e == NULL)
1978                         break;
1979
1980                 if (e[0] == 0) {
1981                         if (!path || path[0] == '\0')
1982                                 break;
1983                         continue;
1984                 }
1985
1986                 lname->ln_name = e;
1987                 lname->ln_namelen = strlen(e);
1988
1989                 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
1990                 if (ld_parent == NULL) {
1991                         lu_object_put(env, parent);
1992                         rc = -EINVAL;
1993                         break;
1994                 }
1995
1996                 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
1997                 lu_object_put(env, parent);
1998                 if (IS_ERR(child)) {
1999                         rc = (int)PTR_ERR(child);
2000                         CERROR("lookup %s under parent "DFID": rc = %d\n",
2001                                 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2002                                 rc);
2003                         break;
2004                 }
2005                 parent = child;
2006         }
2007         if (rc)
2008                 RETURN(ERR_PTR(rc));
2009
2010         RETURN(parent);
2011 }
2012
2013 static void echo_ucred_init(struct lu_env *env)
2014 {
2015         struct lu_ucred *ucred = lu_ucred(env);
2016
2017         ucred->uc_valid = UCRED_INVALID;
2018
2019         ucred->uc_suppgids[0] = -1;
2020         ucred->uc_suppgids[1] = -1;
2021
2022         ucred->uc_uid = ucred->uc_o_uid  =
2023                                 from_kuid(&init_user_ns, current_uid());
2024         ucred->uc_gid = ucred->uc_o_gid  =
2025                                 from_kgid(&init_user_ns, current_gid());
2026         ucred->uc_fsuid = ucred->uc_o_fsuid =
2027                                 from_kuid(&init_user_ns, current_fsuid());
2028         ucred->uc_fsgid = ucred->uc_o_fsgid =
2029                                 from_kgid(&init_user_ns, current_fsgid());
2030         ucred->uc_cap = cfs_curproc_cap_pack();
2031
2032         /* remove fs privilege for non-root user. */
2033         if (ucred->uc_fsuid)
2034                 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2035         ucred->uc_valid = UCRED_NEW;
2036 }
2037
2038 static void echo_ucred_fini(struct lu_env *env)
2039 {
2040         struct lu_ucred *ucred = lu_ucred(env);
2041         ucred->uc_valid = UCRED_INIT;
2042 }
2043
2044 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
2045 #define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
2046 static int echo_md_handler(struct echo_device *ed, int command,
2047                            char *path, int path_len, __u64 id, int count,
2048                            struct obd_ioctl_data *data)
2049 {
2050         struct echo_thread_info *info;
2051         struct lu_device      *ld = ed->ed_next;
2052         struct lu_env         *env;
2053         __u16                  refcheck;
2054         struct lu_object      *parent;
2055         char                  *name = NULL;
2056         int                    namelen = data->ioc_plen2;
2057         int                    rc = 0;
2058         ENTRY;
2059
2060         if (ld == NULL) {
2061                 CERROR("MD echo client is not being initialized properly\n");
2062                 RETURN(-EINVAL);
2063         }
2064
2065         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2066                 CERROR("Only support MDD layer right now!\n");
2067                 RETURN(-EINVAL);
2068         }
2069
2070         env = cl_env_get(&refcheck);
2071         if (IS_ERR(env))
2072                 RETURN(PTR_ERR(env));
2073
2074         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
2075         if (rc != 0)
2076                 GOTO(out_env, rc);
2077
2078         /* init big_lmm buffer */
2079         info = echo_env_info(env);
2080         LASSERT(info->eti_big_lmm == NULL);
2081         OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2082         if (info->eti_big_lmm == NULL)
2083                 GOTO(out_env, rc = -ENOMEM);
2084         info->eti_big_lmmsize = MIN_MD_SIZE;
2085
2086         parent = echo_resolve_path(env, ed, path, path_len);
2087         if (IS_ERR(parent)) {
2088                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2089                         PTR_ERR(parent));
2090                 GOTO(out_free, rc = PTR_ERR(parent));
2091         }
2092
2093         if (namelen > 0) {
2094                 OBD_ALLOC(name, namelen + 1);
2095                 if (name == NULL)
2096                         GOTO(out_put, rc = -ENOMEM);
2097                 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2098                         GOTO(out_name, rc = -EFAULT);
2099         }
2100
2101         echo_ucred_init(env);
2102
2103         switch (command) {
2104         case ECHO_MD_CREATE:
2105         case ECHO_MD_MKDIR: {
2106                 struct echo_thread_info *info = echo_env_info(env);
2107                 __u32 mode = data->ioc_obdo2.o_mode;
2108                 struct lu_fid *fid = &info->eti_fid;
2109                 int stripe_count = (int)data->ioc_obdo2.o_misc;
2110                 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2111
2112                 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2113                 if (rc != 0)
2114                         break;
2115
2116                 /* In the function below, .hs_keycmp resolves to
2117                  * lu_obj_hop_keycmp() */
2118                 /* coverity[overrun-buffer-val] */
2119                 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2120                                            id, mode, count, stripe_count,
2121                                            stripe_index);
2122                 break;
2123         }
2124         case ECHO_MD_DESTROY:
2125         case ECHO_MD_RMDIR: {
2126                 __u32 mode = data->ioc_obdo2.o_mode;
2127
2128                 rc = echo_destroy_object(env, ed, parent, name, namelen,
2129                                          id, mode, count);
2130                 break;
2131         }
2132         case ECHO_MD_LOOKUP:
2133                 rc = echo_lookup_object(env, ed, parent, id, count);
2134                 break;
2135         case ECHO_MD_GETATTR:
2136                 rc = echo_getattr_object(env, ed, parent, id, count);
2137                 break;
2138         case ECHO_MD_SETATTR:
2139                 rc = echo_setattr_object(env, ed, parent, id, count);
2140                 break;
2141         default:
2142                 CERROR("unknown command %d\n", command);
2143                 rc = -EINVAL;
2144                 break;
2145         }
2146         echo_ucred_fini(env);
2147
2148 out_name:
2149         if (name != NULL)
2150                 OBD_FREE(name, namelen + 1);
2151 out_put:
2152         lu_object_put(env, parent);
2153 out_free:
2154         LASSERT(info->eti_big_lmm);
2155         OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2156         info->eti_big_lmm = NULL;
2157         info->eti_big_lmmsize = 0;
2158 out_env:
2159         cl_env_put(env, &refcheck);
2160         return rc;
2161 }
2162 #endif /* HAVE_SERVER_SUPPORT */
2163
2164 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2165                               struct obdo *oa)
2166 {
2167         struct echo_object      *eco;
2168         struct echo_client_obd  *ec = ed->ed_ec;
2169         int created = 0;
2170         int rc;
2171         ENTRY;
2172
2173         if (!(oa->o_valid & OBD_MD_FLID) ||
2174             !(oa->o_valid & OBD_MD_FLGROUP) ||
2175             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
2176                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2177                 RETURN(-EINVAL);
2178         }
2179
2180         if (ostid_id(&oa->o_oi) == 0)
2181                 ostid_set_id(&oa->o_oi, ++last_object_id);
2182
2183         rc = obd_create(env, ec->ec_exp, oa);
2184         if (rc != 0) {
2185                 CERROR("Cannot create objects: rc = %d\n", rc);
2186                 GOTO(failed, rc);
2187         }
2188
2189         created = 1;
2190
2191         oa->o_valid |= OBD_MD_FLID;
2192
2193         eco = cl_echo_object_find(ed, &oa->o_oi);
2194         if (IS_ERR(eco))
2195                 GOTO(failed, rc = PTR_ERR(eco));
2196         cl_echo_object_put(eco);
2197
2198         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2199         EXIT;
2200
2201 failed:
2202         if (created && rc != 0)
2203                 obd_destroy(env, ec->ec_exp, oa);
2204
2205         if (rc != 0)
2206                 CERROR("create object failed with: rc = %d\n", rc);
2207
2208         return rc;
2209 }
2210
2211 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2212                            struct obdo *oa)
2213 {
2214         struct echo_object *eco;
2215         int rc;
2216         ENTRY;
2217
2218         if (!(oa->o_valid & OBD_MD_FLID) ||
2219             !(oa->o_valid & OBD_MD_FLGROUP) ||
2220             ostid_id(&oa->o_oi) == 0) {
2221                 CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
2222                 RETURN(-EINVAL);
2223         }
2224
2225         rc = 0;
2226         eco = cl_echo_object_find(ed, &oa->o_oi);
2227         if (!IS_ERR(eco))
2228                 *ecop = eco;
2229         else
2230                 rc = PTR_ERR(eco);
2231
2232         RETURN(rc);
2233 }
2234
2235 static void echo_put_object(struct echo_object *eco)
2236 {
2237         int rc;
2238
2239         rc = cl_echo_object_put(eco);
2240         if (rc)
2241                 CERROR("%s: echo client drop an object failed: rc = %d\n",
2242                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
2243 }
2244
2245 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
2246                                          u64 offset, u64 count)
2247 {
2248         char    *addr;
2249         u64      stripe_off;
2250         u64      stripe_id;
2251         int      delta;
2252
2253         /* no partial pages on the client */
2254         LASSERT(count == PAGE_CACHE_SIZE);
2255
2256         addr = kmap(page);
2257
2258         for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2259                 if (rw == OBD_BRW_WRITE) {
2260                         stripe_off = offset + delta;
2261                         stripe_id = id;
2262                 } else {
2263                         stripe_off = 0xdeadbeef00c0ffeeULL;
2264                         stripe_id = 0xdeadbeef00c0ffeeULL;
2265                 }
2266                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2267                                   stripe_off, stripe_id);
2268         }
2269
2270         kunmap(page);
2271 }
2272
2273 static int
2274 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
2275 {
2276         u64      stripe_off;
2277         u64      stripe_id;
2278         char   *addr;
2279         int     delta;
2280         int     rc;
2281         int     rc2;
2282
2283         /* no partial pages on the client */
2284         LASSERT(count == PAGE_CACHE_SIZE);
2285
2286         addr = kmap(page);
2287
2288         for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2289                 stripe_off = offset + delta;
2290                 stripe_id = id;
2291
2292                 rc2 = block_debug_check("test_brw",
2293                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
2294                                         stripe_off, stripe_id);
2295                 if (rc2 != 0) {
2296                         CERROR ("Error in echo object "LPX64"\n", id);
2297                         rc = rc2;
2298                 }
2299         }
2300
2301         kunmap(page);
2302         return rc;
2303 }
2304
2305 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2306                             struct echo_object *eco, u64 offset,
2307                             u64 count, int async)
2308 {
2309         size_t                  npages;
2310         struct brw_page        *pga;
2311         struct brw_page        *pgp;
2312         struct page            **pages;
2313         u64                      off;
2314         size_t                  i;
2315         int                     rc;
2316         int                     verify;
2317         gfp_t                   gfp_mask;
2318         u32                     brw_flags = 0;
2319         ENTRY;
2320
2321         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2322                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2323                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2324
2325         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
2326
2327         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2328
2329         if ((count & (~PAGE_MASK)) != 0)
2330                 RETURN(-EINVAL);
2331
2332         /* XXX think again with misaligned I/O */
2333         npages = count >> PAGE_CACHE_SHIFT;
2334
2335         if (rw == OBD_BRW_WRITE)
2336                 brw_flags = OBD_BRW_ASYNC;
2337
2338         OBD_ALLOC(pga, npages * sizeof(*pga));
2339         if (pga == NULL)
2340                 RETURN(-ENOMEM);
2341
2342         OBD_ALLOC(pages, npages * sizeof(*pages));
2343         if (pages == NULL) {
2344                 OBD_FREE(pga, npages * sizeof(*pga));
2345                 RETURN(-ENOMEM);
2346         }
2347
2348         for (i = 0, pgp = pga, off = offset;
2349              i < npages;
2350              i++, pgp++, off += PAGE_CACHE_SIZE) {
2351
2352                 LASSERT(pgp->pg == NULL);       /* for cleanup */
2353
2354                 rc = -ENOMEM;
2355                 pgp->pg = alloc_page(gfp_mask);
2356                 if (pgp->pg == NULL)
2357                         goto out;
2358
2359                 pages[i] = pgp->pg;
2360                 pgp->count = PAGE_CACHE_SIZE;
2361                 pgp->off = off;
2362                 pgp->flag = brw_flags;
2363
2364                 if (verify)
2365                         echo_client_page_debug_setup(pgp->pg, rw,
2366                                                      ostid_id(&oa->o_oi), off,
2367                                                      pgp->count);
2368         }
2369
2370         /* brw mode can only be used at client */
2371         LASSERT(ed->ed_next != NULL);
2372         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2373
2374  out:
2375         if (rc != 0 || rw != OBD_BRW_READ)
2376                 verify = 0;
2377
2378         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2379                 if (pgp->pg == NULL)
2380                         continue;
2381
2382                 if (verify) {
2383                         int vrc;
2384                         vrc = echo_client_page_debug_check(pgp->pg,
2385                                                            ostid_id(&oa->o_oi),
2386                                                            pgp->off, pgp->count);
2387                         if (vrc != 0 && rc == 0)
2388                                 rc = vrc;
2389                 }
2390                 __free_page(pgp->pg);
2391         }
2392         OBD_FREE(pga, npages * sizeof(*pga));
2393         OBD_FREE(pages, npages * sizeof(*pages));
2394         RETURN(rc);
2395 }
2396
2397 static int echo_client_prep_commit(const struct lu_env *env,
2398                                    struct obd_export *exp, int rw,
2399                                    struct obdo *oa, struct echo_object *eco,
2400                                    u64 offset, u64 count,
2401                                    u64 batch, int async)
2402 {
2403         struct obd_ioobj         ioo;
2404         struct niobuf_local     *lnb;
2405         struct niobuf_remote     rnb;
2406         u64                      off;
2407         u64                      npages, tot_pages, apc;
2408         int i, ret = 0, brw_flags = 0;
2409
2410         ENTRY;
2411
2412         if (count <= 0 || (count & ~PAGE_CACHE_MASK) != 0)
2413                 RETURN(-EINVAL);
2414
2415         apc = npages = batch >> PAGE_CACHE_SHIFT;
2416         tot_pages = count >> PAGE_CACHE_SHIFT;
2417
2418         OBD_ALLOC(lnb, apc * sizeof(struct niobuf_local));
2419         if (lnb == NULL)
2420                 RETURN(-ENOMEM);
2421
2422         if (rw == OBD_BRW_WRITE && async)
2423                 brw_flags |= OBD_BRW_ASYNC;
2424
2425         obdo_to_ioobj(oa, &ioo);
2426
2427         off = offset;
2428
2429         for (; tot_pages > 0; tot_pages -= npages) {
2430                 int lpages;
2431
2432                 if (tot_pages < npages)
2433                         npages = tot_pages;
2434
2435                 rnb.rnb_offset = off;
2436                 rnb.rnb_len = npages * PAGE_CACHE_SIZE;
2437                 rnb.rnb_flags = brw_flags;
2438                 ioo.ioo_bufcnt = 1;
2439                 off += npages * PAGE_CACHE_SIZE;
2440
2441                 lpages = npages;
2442                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
2443                 if (ret != 0)
2444                         GOTO(out, ret);
2445
2446                 for (i = 0; i < lpages; i++) {
2447                         struct page *page = lnb[i].lnb_page;
2448
2449                         /* read past eof? */
2450                         if (page == NULL && lnb[i].lnb_rc == 0)
2451                                 continue;
2452
2453                         if (async)
2454                                 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
2455
2456                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2457                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2458                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2459                                 continue;
2460
2461                         if (rw == OBD_BRW_WRITE)
2462                                 echo_client_page_debug_setup(page, rw,
2463                                                         ostid_id(&oa->o_oi),
2464                                                         lnb[i].lnb_file_offset,
2465                                                         lnb[i].lnb_len);
2466                         else
2467                                 echo_client_page_debug_check(page,
2468                                                         ostid_id(&oa->o_oi),
2469                                                         lnb[i].lnb_file_offset,
2470                                                         lnb[i].lnb_len);
2471                 }
2472
2473                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
2474                                    ret);
2475                 if (ret != 0)
2476                         break;
2477
2478                 /* Reuse env context. */
2479                 lu_context_exit((struct lu_context *)&env->le_ctx);
2480                 lu_context_enter((struct lu_context *)&env->le_ctx);
2481         }
2482
2483 out:
2484         OBD_FREE(lnb, apc * sizeof(struct niobuf_local));
2485
2486         RETURN(ret);
2487 }
2488
2489 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2490                                  struct obd_export *exp,
2491                                  struct obd_ioctl_data *data)
2492 {
2493         struct obd_device *obd = class_exp2obd(exp);
2494         struct echo_device *ed = obd2echo_dev(obd);
2495         struct echo_client_obd *ec = ed->ed_ec;
2496         struct obdo *oa = &data->ioc_obdo1;
2497         struct echo_object *eco;
2498         int rc;
2499         int async = 0;
2500         long test_mode;
2501         ENTRY;
2502
2503         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2504
2505         rc = echo_get_object(&eco, ed, oa);
2506         if (rc)
2507                 RETURN(rc);
2508
2509         oa->o_valid &= ~OBD_MD_FLHANDLE;
2510
2511         /* OFD/obdfilter works only via prep/commit */
2512         test_mode = (long)data->ioc_pbuf1;
2513         if (ed->ed_next == NULL && test_mode != 3) {
2514                 test_mode = 3;
2515                 data->ioc_plen1 = data->ioc_count;
2516         }
2517
2518         if (test_mode == 3)
2519                 async = 1;
2520
2521         /* Truncate batch size to maximum */
2522         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2523                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2524
2525         switch (test_mode) {
2526         case 1:
2527                 /* fall through */
2528         case 2:
2529                 rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
2530                                       data->ioc_count, async);
2531                 break;
2532         case 3:
2533                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
2534                                              data->ioc_offset, data->ioc_count,
2535                                              data->ioc_plen1, async);
2536                 break;
2537         default:
2538                 rc = -EINVAL;
2539         }
2540
2541         echo_put_object(eco);
2542
2543         RETURN(rc);
2544 }
2545
2546 static int
2547 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2548                       void *karg, void __user *uarg)
2549 {
2550 #ifdef HAVE_SERVER_SUPPORT
2551         struct tgt_session_info *tsi;
2552 #endif
2553         struct obd_device      *obd = exp->exp_obd;
2554         struct echo_device     *ed = obd2echo_dev(obd);
2555         struct echo_client_obd *ec = ed->ed_ec;
2556         struct echo_object     *eco;
2557         struct obd_ioctl_data  *data = karg;
2558         struct lu_env          *env;
2559         struct obdo            *oa;
2560         struct lu_fid           fid;
2561         int                     rw = OBD_BRW_READ;
2562         int                     rc = 0;
2563 #ifdef HAVE_SERVER_SUPPORT
2564         struct lu_context        echo_session;
2565 #endif
2566         ENTRY;
2567
2568         oa = &data->ioc_obdo1;
2569         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2570                 oa->o_valid |= OBD_MD_FLGROUP;
2571                 ostid_set_seq_echo(&oa->o_oi);
2572         }
2573
2574         /* This FID is unpacked just for validation at this point */
2575         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2576         if (rc < 0)
2577                 RETURN(rc);
2578
2579         OBD_ALLOC_PTR(env);
2580         if (env == NULL)
2581                 RETURN(-ENOMEM);
2582
2583         rc = lu_env_init(env, LCT_DT_THREAD);
2584         if (rc)
2585                 GOTO(out_alloc, rc = -ENOMEM);
2586
2587 #ifdef HAVE_SERVER_SUPPORT
2588         env->le_ses = &echo_session;
2589         rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
2590         if (unlikely(rc < 0))
2591                 GOTO(out_env, rc);
2592         lu_context_enter(env->le_ses);
2593
2594         tsi = tgt_ses_info(env);
2595         tsi->tsi_exp = ec->ec_exp;
2596         tsi->tsi_jobid = NULL;
2597 #endif
2598         switch (cmd) {
2599         case OBD_IOC_CREATE:                    /* may create echo object */
2600                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2601                         GOTO (out, rc = -EPERM);
2602
2603                 rc = echo_create_object(env, ed, oa);
2604                 GOTO(out, rc);
2605
2606 #ifdef HAVE_SERVER_SUPPORT
2607         case OBD_IOC_ECHO_MD: {
2608                 int count;
2609                 int cmd;
2610                 char *dir = NULL;
2611                 int dirlen;
2612                 __u64 id;
2613
2614                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2615                         GOTO(out, rc = -EPERM);
2616
2617                 count = data->ioc_count;
2618                 cmd = data->ioc_command;
2619
2620                 id = data->ioc_obdo2.o_oi.oi.oi_id;
2621                 dirlen = data->ioc_plen1;
2622                 OBD_ALLOC(dir, dirlen + 1);
2623                 if (dir == NULL)
2624                         GOTO(out, rc = -ENOMEM);
2625
2626                 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2627                         OBD_FREE(dir, data->ioc_plen1 + 1);
2628                         GOTO(out, rc = -EFAULT);
2629                 }
2630
2631                 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2632                 OBD_FREE(dir, dirlen + 1);
2633                 GOTO(out, rc);
2634         }
2635         case OBD_IOC_ECHO_ALLOC_SEQ: {
2636                 struct lu_env   *cl_env;
2637                 __u16            refcheck;
2638                 __u64            seq;
2639                 int              max_count;
2640
2641                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2642                         GOTO(out, rc = -EPERM);
2643
2644                 cl_env = cl_env_get(&refcheck);
2645                 if (IS_ERR(cl_env))
2646                         GOTO(out, rc = PTR_ERR(cl_env));
2647
2648                 rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
2649                                             ECHO_MD_SES_TAG);
2650                 if (rc != 0) {
2651                         cl_env_put(cl_env, &refcheck);
2652                         GOTO(out, rc);
2653                 }
2654
2655                 rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
2656                 cl_env_put(cl_env, &refcheck);
2657                 if (rc < 0) {
2658                         CERROR("%s: Can not alloc seq: rc = %d\n",
2659                                obd->obd_name, rc);
2660                         GOTO(out, rc);
2661                 }
2662
2663                 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2664                         return -EFAULT;
2665
2666                 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2667                 if (copy_to_user(data->ioc_pbuf2, &max_count,
2668                                      data->ioc_plen2))
2669                         return -EFAULT;
2670                 GOTO(out, rc);
2671         }
2672 #endif /* HAVE_SERVER_SUPPORT */
2673         case OBD_IOC_DESTROY:
2674                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2675                         GOTO (out, rc = -EPERM);
2676
2677                 rc = echo_get_object(&eco, ed, oa);
2678                 if (rc == 0) {
2679                         rc = obd_destroy(env, ec->ec_exp, oa);
2680                         if (rc == 0)
2681                                 eco->eo_deleted = 1;
2682                         echo_put_object(eco);
2683                 }
2684                 GOTO(out, rc);
2685
2686         case OBD_IOC_GETATTR:
2687                 rc = echo_get_object(&eco, ed, oa);
2688                 if (rc == 0) {
2689                         rc = obd_getattr(env, ec->ec_exp, oa);
2690                         echo_put_object(eco);
2691                 }
2692                 GOTO(out, rc);
2693
2694         case OBD_IOC_SETATTR:
2695                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2696                         GOTO (out, rc = -EPERM);
2697
2698                 rc = echo_get_object(&eco, ed, oa);
2699                 if (rc == 0) {
2700                         rc = obd_setattr(env, ec->ec_exp, oa);
2701                         echo_put_object(eco);
2702                 }
2703                 GOTO(out, rc);
2704
2705         case OBD_IOC_BRW_WRITE:
2706                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2707                         GOTO (out, rc = -EPERM);
2708
2709                 rw = OBD_BRW_WRITE;
2710                 /* fall through */
2711         case OBD_IOC_BRW_READ:
2712                 rc = echo_client_brw_ioctl(env, rw, exp, data);
2713                 GOTO(out, rc);
2714
2715         default:
2716                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2717                 GOTO (out, rc = -ENOTTY);
2718         }
2719
2720         EXIT;
2721 out:
2722 #ifdef HAVE_SERVER_SUPPORT
2723         lu_context_exit(env->le_ses);
2724         lu_context_fini(env->le_ses);
2725 out_env:
2726 #endif
2727         lu_env_fini(env);
2728 out_alloc:
2729         OBD_FREE_PTR(env);
2730
2731         return rc;
2732 }
2733
2734 static int echo_client_setup(const struct lu_env *env,
2735                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2736 {
2737         struct echo_client_obd *ec = &obddev->u.echo_client;
2738         struct obd_device *tgt;
2739         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2740         struct obd_connect_data *ocd = NULL;
2741         int rc;
2742         ENTRY;
2743
2744         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2745                 CERROR("requires a TARGET OBD name\n");
2746                 RETURN(-EINVAL);
2747         }
2748
2749         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2750         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2751                 CERROR("device not attached or not set up (%s)\n",
2752                        lustre_cfg_string(lcfg, 1));
2753                 RETURN(-EINVAL);
2754         }
2755
2756         spin_lock_init(&ec->ec_lock);
2757         INIT_LIST_HEAD(&ec->ec_objects);
2758         INIT_LIST_HEAD(&ec->ec_locks);
2759         ec->ec_unique = 0;
2760
2761         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2762 #ifdef HAVE_SERVER_SUPPORT
2763                 lu_context_tags_update(ECHO_MD_CTX_TAG);
2764                 lu_session_tags_update(ECHO_MD_SES_TAG);
2765 #else
2766                 CERROR("Local operations are NOT supported on client side. "
2767                        "Only remote operations are supported. Metadata client "
2768                        "must be run on server side.\n");
2769 #endif
2770                 RETURN(0);
2771         }
2772
2773         OBD_ALLOC(ocd, sizeof(*ocd));
2774         if (ocd == NULL) {
2775                 CERROR("Can't alloc ocd connecting to %s\n",
2776                        lustre_cfg_string(lcfg, 1));
2777                 return -ENOMEM;
2778         }
2779
2780         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2781                                  OBD_CONNECT_BRW_SIZE |
2782                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2783                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2784                                  OBD_CONNECT_FID;
2785         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2786         ocd->ocd_version = LUSTRE_VERSION_CODE;
2787         ocd->ocd_group = FID_SEQ_ECHO;
2788
2789         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2790         if (rc == 0) {
2791                 /* Turn off pinger because it connects to tgt obd directly. */
2792                 spin_lock(&tgt->obd_dev_lock);
2793                 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2794                 spin_unlock(&tgt->obd_dev_lock);
2795         }
2796
2797         OBD_FREE(ocd, sizeof(*ocd));
2798
2799         if (rc != 0) {
2800                 CERROR("fail to connect to device %s\n",
2801                        lustre_cfg_string(lcfg, 1));
2802                 return (rc);
2803         }
2804
2805         RETURN(rc);
2806 }
2807
2808 static int echo_client_cleanup(struct obd_device *obddev)
2809 {
2810         struct echo_device *ed = obd2echo_dev(obddev);
2811         struct echo_client_obd *ec = &obddev->u.echo_client;
2812         int rc;
2813         ENTRY;
2814
2815         /*Do nothing for Metadata echo client*/
2816         if (ed == NULL )
2817                 RETURN(0);
2818
2819         if (ed->ed_next_ismd) {
2820 #ifdef HAVE_SERVER_SUPPORT
2821                 lu_context_tags_clear(ECHO_MD_CTX_TAG);
2822                 lu_session_tags_clear(ECHO_MD_SES_TAG);
2823 #else
2824                 CERROR("This is client-side only module, does not support "
2825                         "metadata echo client.\n");
2826 #endif
2827                 RETURN(0);
2828         }
2829
2830         if (!list_empty(&obddev->obd_exports)) {
2831                 CERROR("still has clients!\n");
2832                 RETURN(-EBUSY);
2833         }
2834
2835         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2836         rc = obd_disconnect(ec->ec_exp);
2837         if (rc != 0)
2838                 CERROR("fail to disconnect device: %d\n", rc);
2839
2840         RETURN(rc);
2841 }
2842
2843 static int echo_client_connect(const struct lu_env *env,
2844                                struct obd_export **exp,
2845                                struct obd_device *src, struct obd_uuid *cluuid,
2846                                struct obd_connect_data *data, void *localdata)
2847 {
2848         int                rc;
2849         struct lustre_handle conn = { 0 };
2850
2851         ENTRY;
2852         rc = class_connect(&conn, src, cluuid);
2853         if (rc == 0) {
2854                 *exp = class_conn2export(&conn);
2855         }
2856
2857         RETURN (rc);
2858 }
2859
2860 static int echo_client_disconnect(struct obd_export *exp)
2861 {
2862         int                     rc;
2863         ENTRY;
2864
2865         if (exp == NULL)
2866                 GOTO(out, rc = -EINVAL);
2867
2868         rc = class_disconnect(exp);
2869         GOTO(out, rc);
2870  out:
2871         return rc;
2872 }
2873
2874 static struct obd_ops echo_client_obd_ops = {
2875         .o_owner       = THIS_MODULE,
2876         .o_iocontrol   = echo_client_iocontrol,
2877         .o_connect     = echo_client_connect,
2878         .o_disconnect  = echo_client_disconnect
2879 };
2880
2881 static int __init obdecho_init(void)
2882 {
2883         int rc;
2884
2885         ENTRY;
2886         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2887
2888         LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2889
2890 # ifdef HAVE_SERVER_SUPPORT
2891         rc = echo_persistent_pages_init();
2892         if (rc != 0)
2893                 goto failed_0;
2894
2895         rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
2896                                  LUSTRE_ECHO_NAME, NULL);
2897         if (rc != 0)
2898                 goto failed_1;
2899 # endif
2900
2901         rc = lu_kmem_init(echo_caches);
2902         if (rc == 0) {
2903                 rc = class_register_type(&echo_client_obd_ops, NULL, true, NULL,
2904                                          LUSTRE_ECHO_CLIENT_NAME,
2905                                          &echo_device_type);
2906                 if (rc)
2907                         lu_kmem_fini(echo_caches);
2908         }
2909
2910 # ifdef HAVE_SERVER_SUPPORT
2911         if (rc == 0)
2912                 RETURN(0);
2913
2914         class_unregister_type(LUSTRE_ECHO_NAME);
2915 failed_1:
2916         echo_persistent_pages_fini();
2917 failed_0:
2918 # endif
2919         RETURN(rc);
2920 }
2921
2922 static void __exit obdecho_exit(void)
2923 {
2924         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2925         lu_kmem_fini(echo_caches);
2926
2927 #ifdef HAVE_SERVER_SUPPORT
2928         class_unregister_type(LUSTRE_ECHO_NAME);
2929         echo_persistent_pages_fini();
2930 #endif
2931 }
2932
2933 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2934 MODULE_DESCRIPTION("Lustre Echo Client test driver");
2935 MODULE_VERSION(LUSTRE_VERSION_STRING);
2936 MODULE_LICENSE("GPL");
2937
2938 module_init(obdecho_init);
2939 module_exit(obdecho_exit);
2940
2941 /** @} echo_client */