Whamcloud - gitweb
cef8317472cf895be2abe64722ec97918421a474
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  */
38
39 #define DEBUG_SUBSYSTEM S_ECHO
40 #ifdef __KERNEL__
41 #include <libcfs/libcfs.h>
42 #else
43 #include <liblustre.h>
44 #endif
45
46 #include <obd.h>
47 #include <obd_support.h>
48 #include <obd_class.h>
49 #include <lustre_debug.h>
50 #include <lprocfs_status.h>
51 #include <cl_object.h>
52 #include <lustre_fid.h>
53 #include <lustre_acl.h>
54 #include <lustre_net.h>
55
56 #include "echo_internal.h"
57
58 /** \defgroup echo_client Echo Client
59  * @{
60  */
61
62 struct echo_device {
63         struct cl_device        ed_cl;
64         struct echo_client_obd *ed_ec;
65
66         struct cl_site          ed_site_myself;
67         struct cl_site         *ed_site;
68         struct lu_device       *ed_next;
69         int                     ed_next_islov;
70         int                     ed_next_ismd;
71         struct lu_client_seq   *ed_cl_seq;
72 };
73
74 struct echo_object {
75         struct cl_object        eo_cl;
76         struct cl_object_header eo_hdr;
77
78         struct echo_device     *eo_dev;
79         cfs_list_t              eo_obj_chain;
80         struct lov_stripe_md   *eo_lsm;
81         cfs_atomic_t            eo_npages;
82         int                     eo_deleted;
83 };
84
85 struct echo_object_conf {
86         struct cl_object_conf  eoc_cl;
87         struct lov_stripe_md **eoc_md;
88 };
89
90 struct echo_page {
91         struct cl_page_slice   ep_cl;
92         cfs_mutex_t            ep_lock;
93         cfs_page_t            *ep_vmpage;
94 };
95
96 struct echo_lock {
97         struct cl_lock_slice   el_cl;
98         cfs_list_t             el_chain;
99         struct echo_object    *el_object;
100         __u64                  el_cookie;
101         cfs_atomic_t           el_refcount;
102 };
103
104 struct echo_io {
105         struct cl_io_slice     ei_cl;
106 };
107
108 #if 0
109 struct echo_req {
110         struct cl_req_slice er_cl;
111 };
112 #endif
113
114 static int echo_client_setup(const struct lu_env *env,
115                              struct obd_device *obddev,
116                              struct lustre_cfg *lcfg);
117 static int echo_client_cleanup(struct obd_device *obddev);
118
119
120 /** \defgroup echo_helpers Helper functions
121  * @{
122  */
123 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
124 {
125         return container_of0(dev, struct echo_device, ed_cl);
126 }
127
128 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
129 {
130         return &d->ed_cl;
131 }
132
133 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
134 {
135         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
136 }
137
138 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
139 {
140         return &eco->eo_cl;
141 }
142
143 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
144 {
145         return container_of(o, struct echo_object, eo_cl);
146 }
147
148 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
149 {
150         return container_of(s, struct echo_page, ep_cl);
151 }
152
153 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
154 {
155         return container_of(s, struct echo_lock, el_cl);
156 }
157
158 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
159 {
160         return ecl->el_cl.cls_lock;
161 }
162
163 static struct lu_context_key echo_thread_key;
164 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
165 {
166         struct echo_thread_info *info;
167         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
168         LASSERT(info != NULL);
169         return info;
170 }
171
172 static inline
173 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
174 {
175         return container_of(c, struct echo_object_conf, eoc_cl);
176 }
177
178 static inline void lsm2fid(struct lov_stripe_md *lsm, struct lu_fid *fid)
179 {
180         fid_zero(fid);
181         fid->f_seq = FID_SEQ_ECHO;
182         /* truncated to 32 bits by assignment */
183         fid->f_oid = lsm->lsm_object_id;
184         fid->f_ver = lsm->lsm_object_id >> 32;
185 }
186 /** @} echo_helpers */
187
188 static struct echo_object *cl_echo_object_find(struct echo_device *d,
189                                                struct lov_stripe_md **lsm);
190 static int cl_echo_object_put(struct echo_object *eco);
191 static int cl_echo_enqueue   (struct echo_object *eco, obd_off start,
192                               obd_off end, int mode, __u64 *cookie);
193 static int cl_echo_cancel    (struct echo_device *d, __u64 cookie);
194 static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
195                               cfs_page_t **pages, int npages, int async);
196
197 static struct echo_thread_info *echo_env_info(const struct lu_env *env);
198
199 struct echo_thread_info {
200         struct echo_object_conf eti_conf;
201         struct lustre_md        eti_md;
202
203         struct cl_2queue        eti_queue;
204         struct cl_io            eti_io;
205         struct cl_lock_descr    eti_descr;
206         struct lu_fid           eti_fid;
207         struct md_op_spec       eti_spec;
208         struct lov_mds_md_v3    eti_lmm;
209         struct lov_user_md_v3   eti_lum;
210         struct md_attr          eti_ma;
211         struct lu_name          eti_lname;
212         char                    eti_name[20];
213         struct lu_buf           eti_buf;
214         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
215 };
216
217 /* No session used right now */
218 struct echo_session_info {
219         unsigned long dummy;
220 };
221
222 static cfs_mem_cache_t *echo_page_kmem;
223 static cfs_mem_cache_t *echo_lock_kmem;
224 static cfs_mem_cache_t *echo_object_kmem;
225 static cfs_mem_cache_t *echo_thread_kmem;
226 static cfs_mem_cache_t *echo_session_kmem;
227 //static cfs_mem_cache_t *echo_req_kmem;
228
229 static struct lu_kmem_descr echo_caches[] = {
230         {
231                 .ckd_cache = &echo_page_kmem,
232                 .ckd_name  = "echo_page_kmem",
233                 .ckd_size  = sizeof (struct echo_page)
234         },
235         {
236                 .ckd_cache = &echo_lock_kmem,
237                 .ckd_name  = "echo_lock_kmem",
238                 .ckd_size  = sizeof (struct echo_lock)
239         },
240         {
241                 .ckd_cache = &echo_object_kmem,
242                 .ckd_name  = "echo_object_kmem",
243                 .ckd_size  = sizeof (struct echo_object)
244         },
245         {
246                 .ckd_cache = &echo_thread_kmem,
247                 .ckd_name  = "echo_thread_kmem",
248                 .ckd_size  = sizeof (struct echo_thread_info)
249         },
250         {
251                 .ckd_cache = &echo_session_kmem,
252                 .ckd_name  = "echo_session_kmem",
253                 .ckd_size  = sizeof (struct echo_session_info)
254         },
255 #if 0
256         {
257                 .ckd_cache = &echo_req_kmem,
258                 .ckd_name  = "echo_req_kmem",
259                 .ckd_size  = sizeof (struct echo_req)
260         },
261 #endif
262         {
263                 .ckd_cache = NULL
264         }
265 };
266
267 /** \defgroup echo_page Page operations
268  *
269  * Echo page operations.
270  *
271  * @{
272  */
273 cfs_page_t *echo_page_vmpage(const struct lu_env *env,
274                              const struct cl_page_slice *slice)
275 {
276         return cl2echo_page(slice)->ep_vmpage;
277 }
278
279 static int echo_page_own(const struct lu_env *env,
280                          const struct cl_page_slice *slice,
281                          struct cl_io *io, int nonblock)
282 {
283         struct echo_page *ep = cl2echo_page(slice);
284
285         if (!nonblock)
286                 cfs_mutex_lock(&ep->ep_lock);
287         else if (!cfs_mutex_trylock(&ep->ep_lock))
288                 return -EAGAIN;
289         return 0;
290 }
291
292 static void echo_page_disown(const struct lu_env *env,
293                              const struct cl_page_slice *slice,
294                              struct cl_io *io)
295 {
296         struct echo_page *ep = cl2echo_page(slice);
297
298         LASSERT(cfs_mutex_is_locked(&ep->ep_lock));
299         cfs_mutex_unlock(&ep->ep_lock);
300 }
301
302 static void echo_page_discard(const struct lu_env *env,
303                               const struct cl_page_slice *slice,
304                               struct cl_io *unused)
305 {
306         cl_page_delete(env, slice->cpl_page);
307 }
308
309 static int echo_page_is_vmlocked(const struct lu_env *env,
310                                  const struct cl_page_slice *slice)
311 {
312         if (cfs_mutex_is_locked(&cl2echo_page(slice)->ep_lock))
313                 return -EBUSY;
314         return -ENODATA;
315 }
316
317 static void echo_page_completion(const struct lu_env *env,
318                                  const struct cl_page_slice *slice,
319                                  int ioret)
320 {
321         LASSERT(slice->cpl_page->cp_sync_io != NULL);
322 }
323
324 static void echo_page_fini(const struct lu_env *env,
325                            struct cl_page_slice *slice)
326 {
327         struct echo_page *ep    = cl2echo_page(slice);
328         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
329         cfs_page_t *vmpage      = ep->ep_vmpage;
330         ENTRY;
331
332         cfs_atomic_dec(&eco->eo_npages);
333         page_cache_release(vmpage);
334         OBD_SLAB_FREE_PTR(ep, echo_page_kmem);
335         EXIT;
336 }
337
338 static int echo_page_prep(const struct lu_env *env,
339                           const struct cl_page_slice *slice,
340                           struct cl_io *unused)
341 {
342         return 0;
343 }
344
345 static int echo_page_print(const struct lu_env *env,
346                            const struct cl_page_slice *slice,
347                            void *cookie, lu_printer_t printer)
348 {
349         struct echo_page *ep = cl2echo_page(slice);
350
351         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
352                    ep, cfs_mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
353         return 0;
354 }
355
356 static const struct cl_page_operations echo_page_ops = {
357         .cpo_own           = echo_page_own,
358         .cpo_disown        = echo_page_disown,
359         .cpo_discard       = echo_page_discard,
360         .cpo_vmpage        = echo_page_vmpage,
361         .cpo_fini          = echo_page_fini,
362         .cpo_print         = echo_page_print,
363         .cpo_is_vmlocked   = echo_page_is_vmlocked,
364         .io = {
365                 [CRT_READ] = {
366                         .cpo_prep        = echo_page_prep,
367                         .cpo_completion  = echo_page_completion,
368                 },
369                 [CRT_WRITE] = {
370                         .cpo_prep        = echo_page_prep,
371                         .cpo_completion  = echo_page_completion,
372                 }
373         }
374 };
375 /** @} echo_page */
376
377 /** \defgroup echo_lock Locking
378  *
379  * echo lock operations
380  *
381  * @{
382  */
383 static void echo_lock_fini(const struct lu_env *env,
384                            struct cl_lock_slice *slice)
385 {
386         struct echo_lock *ecl = cl2echo_lock(slice);
387
388         LASSERT(cfs_list_empty(&ecl->el_chain));
389         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
390 }
391
392 static void echo_lock_delete(const struct lu_env *env,
393                              const struct cl_lock_slice *slice)
394 {
395         struct echo_lock *ecl      = cl2echo_lock(slice);
396
397         LASSERT(cfs_list_empty(&ecl->el_chain));
398 }
399
400 static int echo_lock_fits_into(const struct lu_env *env,
401                                const struct cl_lock_slice *slice,
402                                const struct cl_lock_descr *need,
403                                const struct cl_io *unused)
404 {
405         return 1;
406 }
407
408 static struct cl_lock_operations echo_lock_ops = {
409         .clo_fini      = echo_lock_fini,
410         .clo_delete    = echo_lock_delete,
411         .clo_fits_into = echo_lock_fits_into
412 };
413
414 /** @} echo_lock */
415
416 /** \defgroup echo_cl_ops cl_object operations
417  *
418  * operations for cl_object
419  *
420  * @{
421  */
422 static struct cl_page *echo_page_init(const struct lu_env *env,
423                                       struct cl_object *obj,
424                                       struct cl_page *page, cfs_page_t *vmpage)
425 {
426         struct echo_page *ep;
427         ENTRY;
428
429         OBD_SLAB_ALLOC_PTR_GFP(ep, echo_page_kmem, CFS_ALLOC_IO);
430         if (ep != NULL) {
431                 struct echo_object *eco = cl2echo_obj(obj);
432                 ep->ep_vmpage = vmpage;
433                 page_cache_get(vmpage);
434                 cfs_mutex_init(&ep->ep_lock);
435                 cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
436                 cfs_atomic_inc(&eco->eo_npages);
437         }
438         RETURN(ERR_PTR(ep ? 0 : -ENOMEM));
439 }
440
441 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
442                         struct cl_io *io)
443 {
444         return 0;
445 }
446
447 static int echo_lock_init(const struct lu_env *env,
448                           struct cl_object *obj, struct cl_lock *lock,
449                           const struct cl_io *unused)
450 {
451         struct echo_lock *el;
452         ENTRY;
453
454         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, CFS_ALLOC_IO);
455         if (el != NULL) {
456                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
457                 el->el_object = cl2echo_obj(obj);
458                 CFS_INIT_LIST_HEAD(&el->el_chain);
459                 cfs_atomic_set(&el->el_refcount, 0);
460         }
461         RETURN(el == NULL ? -ENOMEM : 0);
462 }
463
464 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
465                          const struct cl_object_conf *conf)
466 {
467         return 0;
468 }
469
470 static const struct cl_object_operations echo_cl_obj_ops = {
471         .coo_page_init = echo_page_init,
472         .coo_lock_init = echo_lock_init,
473         .coo_io_init   = echo_io_init,
474         .coo_conf_set  = echo_conf_set
475 };
476 /** @} echo_cl_ops */
477
478 /** \defgroup echo_lu_ops lu_object operations
479  *
480  * operations for echo lu object.
481  *
482  * @{
483  */
484 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
485                             const struct lu_object_conf *conf)
486 {
487         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
488         struct echo_client_obd *ec     = ed->ed_ec;
489         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
490         ENTRY;
491
492         if (ed->ed_next) {
493                 struct lu_object  *below;
494                 struct lu_device  *under;
495
496                 under = ed->ed_next;
497                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
498                                                         under);
499                 if (below == NULL)
500                         RETURN(-ENOMEM);
501                 lu_object_add(obj, below);
502         }
503
504         if (!ed->ed_next_ismd) {
505                 const struct cl_object_conf *cconf = lu2cl_conf(conf);
506                 struct echo_object_conf *econf = cl2echo_conf(cconf);
507
508                 LASSERT(econf->eoc_md);
509                 eco->eo_lsm = *econf->eoc_md;
510                 /* clear the lsm pointer so that it won't get freed. */
511                 *econf->eoc_md = NULL;
512         } else {
513                 eco->eo_lsm = NULL;
514         }
515
516         eco->eo_dev = ed;
517         cfs_atomic_set(&eco->eo_npages, 0);
518
519         cfs_spin_lock(&ec->ec_lock);
520         cfs_list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
521         cfs_spin_unlock(&ec->ec_lock);
522
523         RETURN(0);
524 }
525
526 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
527 {
528         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
529         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
530         struct lov_stripe_md *lsm  = eco->eo_lsm;
531         ENTRY;
532
533         LASSERT(cfs_atomic_read(&eco->eo_npages) == 0);
534
535         cfs_spin_lock(&ec->ec_lock);
536         cfs_list_del_init(&eco->eo_obj_chain);
537         cfs_spin_unlock(&ec->ec_lock);
538
539         lu_object_fini(obj);
540         lu_object_header_fini(obj->lo_header);
541
542         if (lsm)
543                 obd_free_memmd(ec->ec_exp, &lsm);
544         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
545         EXIT;
546 }
547
548 static int echo_object_print(const struct lu_env *env, void *cookie,
549                             lu_printer_t p, const struct lu_object *o)
550 {
551         struct echo_object *obj = cl2echo_obj(lu2cl(o));
552
553         return (*p)(env, cookie, "echoclient-object@%p", obj);
554 }
555
556 static const struct lu_object_operations echo_lu_obj_ops = {
557         .loo_object_init      = echo_object_init,
558         .loo_object_delete    = NULL,
559         .loo_object_release   = NULL,
560         .loo_object_free      = echo_object_free,
561         .loo_object_print     = echo_object_print,
562         .loo_object_invariant = NULL
563 };
564 /** @} echo_lu_ops */
565
566 /** \defgroup echo_lu_dev_ops  lu_device operations
567  *
568  * Operations for echo lu device.
569  *
570  * @{
571  */
572 static struct lu_object *echo_object_alloc(const struct lu_env *env,
573                                            const struct lu_object_header *hdr,
574                                            struct lu_device *dev)
575 {
576         struct echo_object *eco;
577         struct lu_object *obj = NULL;
578         ENTRY;
579
580         /* we're the top dev. */
581         LASSERT(hdr == NULL);
582         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, CFS_ALLOC_IO);
583         if (eco != NULL) {
584                 struct cl_object_header *hdr = &eco->eo_hdr;
585
586                 obj = &echo_obj2cl(eco)->co_lu;
587                 cl_object_header_init(hdr);
588                 lu_object_init(obj, &hdr->coh_lu, dev);
589                 lu_object_add_top(&hdr->coh_lu, obj);
590
591                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
592                 obj->lo_ops       = &echo_lu_obj_ops;
593         }
594         RETURN(obj);
595 }
596
597 static struct lu_device_operations echo_device_lu_ops = {
598         .ldo_object_alloc   = echo_object_alloc,
599 };
600
601 /** @} echo_lu_dev_ops */
602
603 static struct cl_device_operations echo_device_cl_ops = {
604 };
605
606 /** \defgroup echo_init Setup and teardown
607  *
608  * Init and fini functions for echo client.
609  *
610  * @{
611  */
612 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
613 {
614         struct cl_site *site = &ed->ed_site_myself;
615         int rc;
616
617         /* initialize site */
618         rc = cl_site_init(site, &ed->ed_cl);
619         if (rc) {
620                 CERROR("Cannot initilize site for echo client(%d)\n", rc);
621                 return rc;
622         }
623
624         rc = lu_site_init_finish(&site->cs_lu);
625         if (rc)
626                 return rc;
627
628         ed->ed_site = site;
629         return 0;
630 }
631
632 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
633 {
634         if (ed->ed_site) {
635                 if (!ed->ed_next_ismd)
636                         cl_site_fini(ed->ed_site);
637                 ed->ed_site = NULL;
638         }
639 }
640
641 static void *echo_thread_key_init(const struct lu_context *ctx,
642                           struct lu_context_key *key)
643 {
644         struct echo_thread_info *info;
645
646         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, CFS_ALLOC_IO);
647         if (info == NULL)
648                 info = ERR_PTR(-ENOMEM);
649         return info;
650 }
651
652 static void echo_thread_key_fini(const struct lu_context *ctx,
653                          struct lu_context_key *key, void *data)
654 {
655         struct echo_thread_info *info = data;
656         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
657 }
658
659 static void echo_thread_key_exit(const struct lu_context *ctx,
660                          struct lu_context_key *key, void *data)
661 {
662 }
663
664 static struct lu_context_key echo_thread_key = {
665         .lct_tags = LCT_CL_THREAD,
666         .lct_init = echo_thread_key_init,
667         .lct_fini = echo_thread_key_fini,
668         .lct_exit = echo_thread_key_exit
669 };
670
671 static void *echo_session_key_init(const struct lu_context *ctx,
672                                   struct lu_context_key *key)
673 {
674         struct echo_session_info *session;
675
676         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, CFS_ALLOC_IO);
677         if (session == NULL)
678                 session = ERR_PTR(-ENOMEM);
679         return session;
680 }
681
682 static void echo_session_key_fini(const struct lu_context *ctx,
683                                  struct lu_context_key *key, void *data)
684 {
685         struct echo_session_info *session = data;
686         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
687 }
688
689 static void echo_session_key_exit(const struct lu_context *ctx,
690                                  struct lu_context_key *key, void *data)
691 {
692 }
693
694 static struct lu_context_key echo_session_key = {
695         .lct_tags = LCT_SESSION,
696         .lct_init = echo_session_key_init,
697         .lct_fini = echo_session_key_fini,
698         .lct_exit = echo_session_key_exit
699 };
700
701 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
702
703 #define ECHO_SEQ_WIDTH 0xffffffff
704 static int echo_fid_init(struct echo_device *ed, char *obd_name,
705                          struct md_site *ms)
706 {
707         char *prefix;
708         int rc;
709         ENTRY;
710
711         OBD_ALLOC_PTR(ed->ed_cl_seq);
712         if (ed->ed_cl_seq == NULL)
713                 RETURN(-ENOMEM);
714
715         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
716         if (prefix == NULL)
717                 GOTO(out_free_seq, rc = -ENOMEM);
718
719         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
720
721         /* Init client side sequence-manager */
722         rc = seq_client_init(ed->ed_cl_seq, NULL,
723                              LUSTRE_SEQ_METADATA,
724                              prefix, ms->ms_server_seq);
725         ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
726         OBD_FREE(prefix, MAX_OBD_NAME + 5);
727         if (rc)
728                 GOTO(out_free_seq, rc);
729
730         RETURN(0);
731
732 out_free_seq:
733         OBD_FREE_PTR(ed->ed_cl_seq);
734         ed->ed_cl_seq = NULL;
735         RETURN(rc);
736 }
737
738 static int echo_fid_fini(struct obd_device *obddev)
739 {
740         struct echo_device *ed = obd2echo_dev(obddev);
741         ENTRY;
742
743         if (ed->ed_cl_seq != NULL) {
744                 seq_client_fini(ed->ed_cl_seq);
745                 OBD_FREE_PTR(ed->ed_cl_seq);
746                 ed->ed_cl_seq = NULL;
747         }
748
749         RETURN(0);
750 }
751
752 static struct lu_device *echo_device_alloc(const struct lu_env *env,
753                                            struct lu_device_type *t,
754                                            struct lustre_cfg *cfg)
755 {
756         struct lu_device   *next;
757         struct echo_device *ed;
758         struct cl_device   *cd;
759         struct obd_device  *obd = NULL; /* to keep compiler happy */
760         struct obd_device  *tgt;
761         const char *tgt_type_name;
762         int rc;
763         int cleanup = 0;
764         ENTRY;
765
766         OBD_ALLOC_PTR(ed);
767         if (ed == NULL)
768                 GOTO(out, rc = -ENOMEM);
769
770         cleanup = 1;
771         cd = &ed->ed_cl;
772         rc = cl_device_init(cd, t);
773         if (rc)
774                 GOTO(out, rc);
775
776         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
777         cd->cd_ops = &echo_device_cl_ops;
778
779         cleanup = 2;
780         obd = class_name2obd(lustre_cfg_string(cfg, 0));
781         LASSERT(obd != NULL);
782         LASSERT(env != NULL);
783
784         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
785         if (tgt == NULL) {
786                 CERROR("Can not find tgt device %s\n",
787                         lustre_cfg_string(cfg, 1));
788                 GOTO(out, rc = -ENODEV);
789         }
790
791         next = tgt->obd_lu_dev;
792         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
793                 ed->ed_next_ismd = 1;
794         } else {
795                 ed->ed_next_ismd = 0;
796                 rc = echo_site_init(env, ed);
797                 if (rc)
798                         GOTO(out, rc);
799         }
800         cleanup = 3;
801
802         rc = echo_client_setup(env, obd, cfg);
803         if (rc)
804                 GOTO(out, rc);
805
806         ed->ed_ec = &obd->u.echo_client;
807         cleanup = 4;
808
809         if (ed->ed_next_ismd) {
810                 /* Suppose to connect to some Metadata layer */
811                 struct lu_site *ls;
812                 struct lu_device *ld;
813                 int    found = 0;
814
815                 if (next == NULL) {
816                         CERROR("%s is not lu device type!\n",
817                                lustre_cfg_string(cfg, 1));
818                         GOTO(out, rc = -EINVAL);
819                 }
820
821                 tgt_type_name = lustre_cfg_string(cfg, 2);
822                 if (!tgt_type_name) {
823                         CERROR("%s no type name for echo %s setup\n",
824                                 lustre_cfg_string(cfg, 1),
825                                 tgt->obd_type->typ_name);
826                         GOTO(out, rc = -EINVAL);
827                 }
828
829                 ls = next->ld_site;
830
831                 cfs_spin_lock(&ls->ls_ld_lock);
832                 cfs_list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
833                         if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
834                                 found = 1;
835                                 break;
836                         }
837                 }
838                 cfs_spin_unlock(&ls->ls_ld_lock);
839
840                 if (found == 0) {
841                         CERROR("%s is not lu device type!\n",
842                                lustre_cfg_string(cfg, 1));
843                         GOTO(out, rc = -EINVAL);
844                 }
845
846                 next = ld;
847                 /* For MD echo client, it will use the site in MDS stack */
848                 ed->ed_site_myself.cs_lu = *ls;
849                 ed->ed_site = &ed->ed_site_myself;
850                 ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu;
851                 rc = echo_fid_init(ed, obd->obd_name, lu_site2md(ls));
852                 if (rc) {
853                         CERROR("echo fid init error %d\n", rc);
854                         GOTO(out, rc);
855                 }
856         } else {
857                  /* if echo client is to be stacked upon ost device, the next is
858                   * NULL since ost is not a clio device so far */
859                 if (next != NULL && !lu_device_is_cl(next))
860                         next = NULL;
861
862                 tgt_type_name = tgt->obd_type->typ_name;
863                 if (next != NULL) {
864                         LASSERT(next != NULL);
865                         if (next->ld_site != NULL)
866                                 GOTO(out, rc = -EBUSY);
867
868                         next->ld_site = &ed->ed_site->cs_lu;
869                         rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
870                                                      next->ld_type->ldt_name,
871                                                      NULL);
872                         if (rc)
873                                 GOTO(out, rc);
874
875                         /* Tricky case, I have to determine the obd type since
876                          * CLIO uses the different parameters to initialize
877                          * objects for lov & osc. */
878                         if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
879                                 ed->ed_next_islov = 1;
880                         else
881                                 LASSERT(strcmp(tgt_type_name,
882                                                LUSTRE_OSC_NAME) == 0);
883                 } else
884                         LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
885         }
886
887         ed->ed_next = next;
888         RETURN(&cd->cd_lu_dev);
889 out:
890         switch(cleanup) {
891         case 4: {
892                 int rc2;
893                 rc2 = echo_client_cleanup(obd);
894                 if (rc2)
895                         CERROR("Cleanup obd device %s error(%d)\n",
896                                obd->obd_name, rc2);
897         }
898
899         case 3:
900                 echo_site_fini(env, ed);
901         case 2:
902                 cl_device_fini(&ed->ed_cl);
903         case 1:
904                 OBD_FREE_PTR(ed);
905         case 0:
906         default:
907                 break;
908         }
909         return(ERR_PTR(rc));
910 }
911
912 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
913                           const char *name, struct lu_device *next)
914 {
915         LBUG();
916         return 0;
917 }
918
919 static struct lu_device *echo_device_fini(const struct lu_env *env,
920                                           struct lu_device *d)
921 {
922         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
923         struct lu_device *next = ed->ed_next;
924
925         while (next && !ed->ed_next_ismd)
926                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
927         return NULL;
928 }
929
930 static void echo_lock_release(const struct lu_env *env,
931                               struct echo_lock *ecl,
932                               int still_used)
933 {
934         struct cl_lock *clk = echo_lock2cl(ecl);
935
936         cl_lock_get(clk);
937         cl_unuse(env, clk);
938         cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
939         if (!still_used) {
940                 cl_lock_mutex_get(env, clk);
941                 cl_lock_cancel(env, clk);
942                 cl_lock_delete(env, clk);
943                 cl_lock_mutex_put(env, clk);
944         }
945         cl_lock_put(env, clk);
946 }
947
948 static struct lu_device *echo_device_free(const struct lu_env *env,
949                                           struct lu_device *d)
950 {
951         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
952         struct echo_client_obd *ec   = ed->ed_ec;
953         struct echo_object     *eco;
954         struct lu_device       *next = ed->ed_next;
955
956         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
957                ed, next);
958
959         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
960
961         /* check if there are objects still alive.
962          * It shouldn't have any object because lu_site_purge would cleanup
963          * all of cached objects. Anyway, probably the echo device is being
964          * parallelly accessed.
965          */
966         cfs_spin_lock(&ec->ec_lock);
967         cfs_list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
968                 eco->eo_deleted = 1;
969         cfs_spin_unlock(&ec->ec_lock);
970
971         /* purge again */
972         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
973
974         CDEBUG(D_INFO,
975                "Waiting for the reference of echo object to be dropped\n");
976
977         /* Wait for the last reference to be dropped. */
978         cfs_spin_lock(&ec->ec_lock);
979         while (!cfs_list_empty(&ec->ec_objects)) {
980                 cfs_spin_unlock(&ec->ec_lock);
981                 CERROR("echo_client still has objects at cleanup time, "
982                        "wait for 1 second\n");
983                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
984                                                    cfs_time_seconds(1));
985                 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
986                 cfs_spin_lock(&ec->ec_lock);
987         }
988         cfs_spin_unlock(&ec->ec_lock);
989
990         LASSERT(cfs_list_empty(&ec->ec_locks));
991
992         CDEBUG(D_INFO, "No object exists, exiting...\n");
993
994         echo_client_cleanup(d->ld_obd);
995         echo_fid_fini(d->ld_obd);
996         while (next && !ed->ed_next_ismd)
997                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
998
999         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
1000         echo_site_fini(env, ed);
1001         cl_device_fini(&ed->ed_cl);
1002         OBD_FREE_PTR(ed);
1003
1004         return NULL;
1005 }
1006
1007 static const struct lu_device_type_operations echo_device_type_ops = {
1008         .ldto_init = echo_type_init,
1009         .ldto_fini = echo_type_fini,
1010
1011         .ldto_start = echo_type_start,
1012         .ldto_stop  = echo_type_stop,
1013
1014         .ldto_device_alloc = echo_device_alloc,
1015         .ldto_device_free  = echo_device_free,
1016         .ldto_device_init  = echo_device_init,
1017         .ldto_device_fini  = echo_device_fini
1018 };
1019
1020 static struct lu_device_type echo_device_type = {
1021         .ldt_tags     = LU_DEVICE_CL,
1022         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
1023         .ldt_ops      = &echo_device_type_ops,
1024         .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1025 };
1026 /** @} echo_init */
1027
1028 /** \defgroup echo_exports Exported operations
1029  *
1030  * exporting functions to echo client
1031  *
1032  * @{
1033  */
1034
1035 /* Interfaces to echo client obd device */
1036 static struct echo_object *cl_echo_object_find(struct echo_device *d,
1037                                                struct lov_stripe_md **lsmp)
1038 {
1039         struct lu_env *env;
1040         struct echo_thread_info *info;
1041         struct echo_object_conf *conf;
1042         struct lov_stripe_md    *lsm;
1043         struct echo_object *eco;
1044         struct cl_object   *obj;
1045         struct lu_fid *fid;
1046         int refcheck;
1047         ENTRY;
1048
1049         LASSERT(lsmp);
1050         lsm = *lsmp;
1051         LASSERT(lsm);
1052         LASSERT(lsm->lsm_object_id);
1053
1054         /* Never return an object if the obd is to be freed. */
1055         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1056                 RETURN(ERR_PTR(-ENODEV));
1057
1058         env = cl_env_get(&refcheck);
1059         if (IS_ERR(env))
1060                 RETURN((void *)env);
1061
1062         info = echo_env_info(env);
1063         conf = &info->eti_conf;
1064         if (d->ed_next) {
1065                 if (!d->ed_next_islov) {
1066                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
1067                         LASSERT(oinfo != NULL);
1068                         oinfo->loi_id = lsm->lsm_object_id;
1069                         oinfo->loi_seq = lsm->lsm_object_seq;
1070                         conf->eoc_cl.u.coc_oinfo = oinfo;
1071                 } else {
1072                         struct lustre_md *md;
1073                         md = &info->eti_md;
1074                         memset(md, 0, sizeof *md);
1075                         md->lsm = lsm;
1076                         conf->eoc_cl.u.coc_md = md;
1077                 }
1078         }
1079         conf->eoc_md = lsmp;
1080
1081         fid  = &info->eti_fid;
1082         lsm2fid(lsm, fid);
1083
1084         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1085         if (IS_ERR(obj))
1086                 GOTO(out, eco = (void*)obj);
1087
1088         eco = cl2echo_obj(obj);
1089         if (eco->eo_deleted) {
1090                 cl_object_put(env, obj);
1091                 eco = ERR_PTR(-EAGAIN);
1092         }
1093
1094 out:
1095         cl_env_put(env, &refcheck);
1096         RETURN(eco);
1097 }
1098
1099 static int cl_echo_object_put(struct echo_object *eco)
1100 {
1101         struct lu_env *env;
1102         struct cl_object *obj = echo_obj2cl(eco);
1103         int refcheck;
1104         ENTRY;
1105
1106         env = cl_env_get(&refcheck);
1107         if (IS_ERR(env))
1108                 RETURN(PTR_ERR(env));
1109
1110         /* an external function to kill an object? */
1111         if (eco->eo_deleted) {
1112                 struct lu_object_header *loh = obj->co_lu.lo_header;
1113                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1114                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1115         }
1116
1117         cl_object_put(env, obj);
1118         cl_env_put(env, &refcheck);
1119         RETURN(0);
1120 }
1121
1122 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1123                             obd_off start, obd_off end, int mode,
1124                             __u64 *cookie , __u32 enqflags)
1125 {
1126         struct cl_io *io;
1127         struct cl_lock *lck;
1128         struct cl_object *obj;
1129         struct cl_lock_descr *descr;
1130         struct echo_thread_info *info;
1131         int rc = -ENOMEM;
1132         ENTRY;
1133
1134         info = echo_env_info(env);
1135         io = &info->eti_io;
1136         descr = &info->eti_descr;
1137         obj = echo_obj2cl(eco);
1138
1139         descr->cld_obj   = obj;
1140         descr->cld_start = cl_index(obj, start);
1141         descr->cld_end   = cl_index(obj, end);
1142         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1143         descr->cld_enq_flags = enqflags;
1144         io->ci_obj = obj;
1145
1146         lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1147         if (lck) {
1148                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1149                 struct echo_lock *el;
1150
1151                 rc = cl_wait(env, lck);
1152                 if (rc == 0) {
1153                         el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1154                         cfs_spin_lock(&ec->ec_lock);
1155                         if (cfs_list_empty(&el->el_chain)) {
1156                                 cfs_list_add(&el->el_chain, &ec->ec_locks);
1157                                 el->el_cookie = ++ec->ec_unique;
1158                         }
1159                         cfs_atomic_inc(&el->el_refcount);
1160                         *cookie = el->el_cookie;
1161                         cfs_spin_unlock(&ec->ec_lock);
1162                 } else
1163                         cl_lock_release(env, lck, "ec enqueue", cfs_current());
1164         }
1165         RETURN(rc);
1166 }
1167
1168 static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end,
1169                            int mode, __u64 *cookie)
1170 {
1171         struct echo_thread_info *info;
1172         struct lu_env *env;
1173         struct cl_io *io;
1174         int refcheck;
1175         int result;
1176         ENTRY;
1177
1178         env = cl_env_get(&refcheck);
1179         if (IS_ERR(env))
1180                 RETURN(PTR_ERR(env));
1181
1182         info = echo_env_info(env);
1183         io = &info->eti_io;
1184
1185         result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1186         if (result < 0)
1187                 GOTO(out, result);
1188         LASSERT(result == 0);
1189
1190         result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1191         cl_io_fini(env, io);
1192
1193         EXIT;
1194 out:
1195         cl_env_put(env, &refcheck);
1196         return result;
1197 }
1198
1199 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1200                            __u64 cookie)
1201 {
1202         struct echo_client_obd *ec = ed->ed_ec;
1203         struct echo_lock       *ecl = NULL;
1204         cfs_list_t             *el;
1205         int found = 0, still_used = 0;
1206         ENTRY;
1207
1208         LASSERT(ec != NULL);
1209         cfs_spin_lock (&ec->ec_lock);
1210         cfs_list_for_each (el, &ec->ec_locks) {
1211                 ecl = cfs_list_entry (el, struct echo_lock, el_chain);
1212                 CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
1213                 found = (ecl->el_cookie == cookie);
1214                 if (found) {
1215                         if (cfs_atomic_dec_and_test(&ecl->el_refcount))
1216                                 cfs_list_del_init(&ecl->el_chain);
1217                         else
1218                                 still_used = 1;
1219                         break;
1220                 }
1221         }
1222         cfs_spin_unlock (&ec->ec_lock);
1223
1224         if (!found)
1225                 RETURN(-ENOENT);
1226
1227         echo_lock_release(env, ecl, still_used);
1228         RETURN(0);
1229 }
1230
1231 static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1232 {
1233         struct lu_env *env;
1234         int refcheck;
1235         int rc;
1236         ENTRY;
1237
1238         env = cl_env_get(&refcheck);
1239         if (IS_ERR(env))
1240                 RETURN(PTR_ERR(env));
1241
1242         rc = cl_echo_cancel0(env, ed, cookie);
1243
1244         cl_env_put(env, &refcheck);
1245         RETURN(rc);
1246 }
1247
1248 static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1249                              enum cl_req_type unused, struct cl_2queue *queue)
1250 {
1251         struct cl_page *clp;
1252         struct cl_page *temp;
1253         int result = 0;
1254         ENTRY;
1255
1256         cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1257                 int rc;
1258                 rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1259                 if (rc == 0)
1260                         continue;
1261                 result = result ?: rc;
1262         }
1263         RETURN(result);
1264 }
1265
1266 static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
1267                               cfs_page_t **pages, int npages, int async)
1268 {
1269         struct lu_env           *env;
1270         struct echo_thread_info *info;
1271         struct cl_object        *obj = echo_obj2cl(eco);
1272         struct echo_device      *ed  = eco->eo_dev;
1273         struct cl_2queue        *queue;
1274         struct cl_io            *io;
1275         struct cl_page          *clp;
1276         struct lustre_handle    lh = { 0 };
1277         int page_size = cl_page_size(obj);
1278         int refcheck;
1279         int rc;
1280         int i;
1281         ENTRY;
1282
1283         LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1284         LASSERT(ed->ed_next != NULL);
1285         env = cl_env_get(&refcheck);
1286         if (IS_ERR(env))
1287                 RETURN(PTR_ERR(env));
1288
1289         info    = echo_env_info(env);
1290         io      = &info->eti_io;
1291         queue   = &info->eti_queue;
1292
1293         cl_2queue_init(queue);
1294         rc = cl_io_init(env, io, CIT_MISC, obj);
1295         if (rc < 0)
1296                 GOTO(out, rc);
1297         LASSERT(rc == 0);
1298
1299
1300         rc = cl_echo_enqueue0(env, eco, offset,
1301                               offset + npages * CFS_PAGE_SIZE - 1,
1302                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1303                               CEF_NEVER);
1304         if (rc < 0)
1305                 GOTO(error_lock, rc);
1306
1307         for (i = 0; i < npages; i++) {
1308                 LASSERT(pages[i]);
1309                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1310                                    pages[i], CPT_TRANSIENT);
1311                 if (IS_ERR(clp)) {
1312                         rc = PTR_ERR(clp);
1313                         break;
1314                 }
1315                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1316
1317                 rc = cl_page_own(env, io, clp);
1318                 if (rc) {
1319                         LASSERT(clp->cp_state == CPS_FREEING);
1320                         cl_page_put(env, clp);
1321                         break;
1322                 }
1323
1324                 cl_2queue_add(queue, clp);
1325
1326                 /* drop the reference count for cl_page_find, so that the page
1327                  * will be freed in cl_2queue_fini. */
1328                 cl_page_put(env, clp);
1329                 cl_page_clip(env, clp, 0, page_size);
1330
1331                 offset += page_size;
1332         }
1333
1334         if (rc == 0) {
1335                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1336
1337                 async = async && (typ == CRT_WRITE);
1338                 if (async)
1339                         rc = cl_echo_async_brw(env, io, typ, queue);
1340                 else
1341                         rc = cl_io_submit_sync(env, io, typ, queue,
1342                                                CRP_NORMAL, 0);
1343                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1344                        async ? "async" : "sync", rc);
1345         }
1346
1347         cl_echo_cancel0(env, ed, lh.cookie);
1348         EXIT;
1349 error_lock:
1350         cl_2queue_discard(env, io, queue);
1351         cl_2queue_disown(env, io, queue);
1352         cl_2queue_fini(env, queue);
1353         cl_io_fini(env, io);
1354 out:
1355         cl_env_put(env, &refcheck);
1356         return rc;
1357 }
1358 /** @} echo_exports */
1359
1360
1361 static obd_id last_object_id;
1362
1363 static int
1364 echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1365 {
1366         struct lov_stripe_md *ulsm = _ulsm;
1367         int nob, i;
1368
1369         nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1370         if (nob > ulsm_nob)
1371                 return (-EINVAL);
1372
1373         if (cfs_copy_to_user (ulsm, lsm, sizeof(ulsm)))
1374                 return (-EFAULT);
1375
1376         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1377                 if (cfs_copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
1378                                       sizeof(lsm->lsm_oinfo[0])))
1379                         return (-EFAULT);
1380         }
1381         return 0;
1382 }
1383
1384 static int
1385 echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
1386                  void *ulsm, int ulsm_nob)
1387 {
1388         struct echo_client_obd *ec = ed->ed_ec;
1389         int                     i;
1390
1391         if (ulsm_nob < sizeof (*lsm))
1392                 return (-EINVAL);
1393
1394         if (cfs_copy_from_user (lsm, ulsm, sizeof (*lsm)))
1395                 return (-EFAULT);
1396
1397         if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1398             lsm->lsm_magic != LOV_MAGIC ||
1399             (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1400             ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1401                 return (-EINVAL);
1402
1403
1404         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1405                 if (cfs_copy_from_user(lsm->lsm_oinfo[i],
1406                                        ((struct lov_stripe_md *)ulsm)-> \
1407                                        lsm_oinfo[i],
1408                                        sizeof(lsm->lsm_oinfo[0])))
1409                         return (-EFAULT);
1410         }
1411         return (0);
1412 }
1413
1414 static inline void echo_md_build_name(struct lu_name *lname, char *name,
1415                                       __u64 id)
1416 {
1417         sprintf(name, "%llu", id);
1418         lname->ln_name = name;
1419         lname->ln_namelen = strlen(name);
1420 }
1421
1422 static int echo_md_create_internal(const struct lu_env *env,
1423                                    struct echo_device *ed,
1424                                    struct md_object *parent,
1425                                    struct lu_fid *fid,
1426                                    struct lu_name *lname,
1427                                    struct md_op_spec *spec,
1428                                    struct md_attr *ma)
1429 {
1430         struct lu_object        *ec_child, *child;
1431         struct lu_device        *ld = ed->ed_next;
1432         int                      rc;
1433
1434         ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1435                                      fid, NULL);
1436         if (IS_ERR(ec_child)) {
1437                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1438                         PTR_ERR(ec_child));
1439                 return PTR_ERR(ec_child);
1440         }
1441
1442         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1443         if (child == NULL) {
1444                 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1445                 GOTO(out_put, rc = -EINVAL);
1446         }
1447
1448         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1449                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1450
1451         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1452         if (rc) {
1453                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1454                 GOTO(out_put, rc);
1455         }
1456         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
1457                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1458 out_put:
1459         lu_object_put(env, ec_child);
1460         return rc;
1461 }
1462
1463 static int echo_set_lmm_size(const struct lu_env *env,
1464                              struct lu_device *ld,
1465                              struct md_attr *ma)
1466 {
1467         struct md_device *md = lu2md_dev(ld);
1468         int lmm_size, cookie_size, rc;
1469         ENTRY;
1470
1471         md = lu2md_dev(ld);
1472         rc = md->md_ops->mdo_maxsize_get(env, md,
1473                                          &lmm_size, &cookie_size);
1474         if (rc)
1475                 RETURN(rc);
1476
1477         ma->ma_lmm_size = lmm_size;
1478         if (lmm_size > 0) {
1479                 OBD_ALLOC(ma->ma_lmm, lmm_size);
1480                 if (ma->ma_lmm == NULL) {
1481                         ma->ma_lmm_size = 0;
1482                         RETURN(-ENOMEM);
1483                 }
1484         }
1485
1486         ma->ma_cookie_size = cookie_size;
1487         if (cookie_size > 0) {
1488                 OBD_ALLOC(ma->ma_cookie, cookie_size);
1489                 if (ma->ma_cookie == NULL) {
1490                         ma->ma_cookie_size = 0;
1491                         RETURN(-ENOMEM);
1492                 }
1493         }
1494
1495         RETURN(0);
1496 }
1497
1498 static int echo_create_md_object(const struct lu_env *env,
1499                                  struct echo_device *ed,
1500                                  struct lu_object *ec_parent,
1501                                  struct lu_fid *fid,
1502                                  char *name, int namelen,
1503                                  __u64 id, __u32 mode, int count,
1504                                  int stripe_count, int stripe_offset)
1505 {
1506         struct lu_object        *parent;
1507         struct echo_thread_info *info = echo_env_info(env);
1508         struct lu_name          *lname = &info->eti_lname;
1509         struct md_op_spec       *spec = &info->eti_spec;
1510         struct md_attr          *ma = &info->eti_ma;
1511         struct lu_device        *ld = ed->ed_next;
1512         int                      rc = 0;
1513         int                      i;
1514
1515         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1516         if (ec_parent == NULL) {
1517                 lu_object_put(env, ec_parent);
1518                 RETURN(PTR_ERR(parent));
1519         }
1520
1521         memset(ma, 0, sizeof(*ma));
1522         memset(spec, 0, sizeof(*spec));
1523         if (stripe_count != 0) {
1524                 spec->sp_cr_flags |= FMODE_WRITE;
1525                 rc = echo_set_lmm_size(env, ld, ma);
1526                 if (rc)
1527                         GOTO(out_free, rc);
1528                 if (stripe_count != -1) {
1529                         struct lov_user_md_v3 *lum = &info->eti_lum;
1530                         lum->lmm_magic = LOV_USER_MAGIC_V3;
1531                         lum->lmm_stripe_count = stripe_count;
1532                         lum->lmm_stripe_offset = stripe_offset;
1533                         lum->lmm_pattern = 0;
1534                         spec->u.sp_ea.eadata = lum;
1535                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1536                 }
1537         }
1538
1539         ma->ma_attr.la_mode = mode;
1540         ma->ma_attr.la_valid = LA_CTIME;
1541         ma->ma_attr.la_ctime = cfs_time_current_64();
1542
1543         if (name != NULL) {
1544                 lname->ln_name = name;
1545                 lname->ln_namelen = namelen;
1546                 /* If name is specified, only create one object by name */
1547                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1548                                              spec, ma);
1549                 GOTO(out_free, rc);
1550         }
1551
1552         /* Create multiple object sequenced by id */
1553         for (i = 0; i < count; i++) {
1554                 char *tmp_name = info->eti_name;
1555
1556                 echo_md_build_name(lname, tmp_name, id);
1557
1558                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1559                                              spec, ma);
1560                 if (rc) {
1561                         CERROR("Can not create child %s: rc = %d\n", tmp_name,
1562                                 rc);
1563                         break;
1564                 }
1565                 id++;
1566                 fid->f_oid++;
1567         }
1568
1569 out_free:
1570         if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL)
1571                 OBD_FREE(ma->ma_lmm, ma->ma_lmm_size);
1572         if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL)
1573                 OBD_FREE(ma->ma_cookie, ma->ma_cookie_size);
1574
1575         return rc;
1576 }
1577
1578 static struct lu_object *echo_md_lookup(const struct lu_env *env,
1579                                         struct echo_device *ed,
1580                                         struct md_object *parent,
1581                                         struct lu_name *lname)
1582 {
1583         struct echo_thread_info *info = echo_env_info(env);
1584         struct lu_fid           *fid = &info->eti_fid;
1585         struct lu_object        *child;
1586         int    rc;
1587         ENTRY;
1588
1589         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1590                PFID(fid), parent);
1591         rc = mdo_lookup(env, parent, lname, fid, NULL);
1592         if (rc) {
1593                 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1594                 RETURN(ERR_PTR(rc));
1595         }
1596
1597         child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1598
1599         RETURN(child);
1600 }
1601
1602 static int echo_setattr_object(const struct lu_env *env,
1603                                struct echo_device *ed,
1604                                struct lu_object *ec_parent,
1605                                __u64 id, int count)
1606 {
1607         struct lu_object        *parent;
1608         struct echo_thread_info *info = echo_env_info(env);
1609         struct lu_name          *lname = &info->eti_lname;
1610         char                    *name = info->eti_name;
1611         struct lu_device        *ld = ed->ed_next;
1612         struct lu_buf           *buf = &info->eti_buf;
1613         int                      rc = 0;
1614         int                      i;
1615
1616         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1617         if (ec_parent == NULL) {
1618                 lu_object_put(env, ec_parent);
1619                 return PTR_ERR(parent);
1620         }
1621
1622         buf->lb_buf = info->eti_xattr_buf;
1623         buf->lb_len = sizeof(info->eti_xattr_buf);
1624         for (i = 0; i < count; i++) {
1625                 struct lu_object *ec_child, *child;
1626
1627                 echo_md_build_name(lname, name, id);
1628
1629                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1630                 if (IS_ERR(ec_child)) {
1631                         CERROR("Can't find child %s: rc = %ld\n",
1632                                 lname->ln_name, PTR_ERR(ec_child));
1633                         RETURN(PTR_ERR(ec_child));
1634                 }
1635
1636                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1637                 if (child == NULL) {
1638                         CERROR("Can not locate the child %s\n", lname->ln_name);
1639                         lu_object_put(env, ec_child);
1640                         rc = -EINVAL;
1641                         break;
1642                 }
1643
1644                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1645                        PFID(lu_object_fid(child)));
1646
1647                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1648                 rc = mo_xattr_set(env, lu2md(child), buf, name,
1649                                   LU_XATTR_CREATE);
1650                 if (rc) {
1651                         CERROR("Can not setattr child "DFID": rc = %d\n",
1652                                 PFID(lu_object_fid(child)), rc);
1653                         lu_object_put(env, ec_child);
1654                         break;
1655                 }
1656                 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1657                        PFID(lu_object_fid(child)));
1658                 id++;
1659                 lu_object_put(env, ec_child);
1660         }
1661         return rc;
1662 }
1663
1664 static int echo_getattr_object(const struct lu_env *env,
1665                                struct echo_device *ed,
1666                                struct lu_object *ec_parent,
1667                                __u64 id, int count)
1668 {
1669         struct lu_object        *parent;
1670         struct echo_thread_info *info = echo_env_info(env);
1671         struct lu_name          *lname = &info->eti_lname;
1672         char                    *name = info->eti_name;
1673         struct md_attr          *ma = &info->eti_ma;
1674         struct lu_device        *ld = ed->ed_next;
1675         int                      rc = 0;
1676         int                      i;
1677
1678         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1679         if (ec_parent == NULL) {
1680                 lu_object_put(env, ec_parent);
1681                 return PTR_ERR(parent);
1682         }
1683
1684         memset(ma, 0, sizeof(*ma));
1685         rc = echo_set_lmm_size(env, ld, ma);
1686         if (rc)
1687                 GOTO(out_free, rc);
1688
1689         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1690         ma->ma_acl = info->eti_xattr_buf;
1691         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1692
1693         for (i = 0; i < count; i++) {
1694                 struct lu_object *ec_child, *child;
1695
1696                 ma->ma_valid = 0;
1697                 echo_md_build_name(lname, name, id);
1698
1699                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1700                 if (IS_ERR(ec_child)) {
1701                         CERROR("Can't find child %s: rc = %ld\n",
1702                                lname->ln_name, PTR_ERR(ec_child));
1703                         RETURN(PTR_ERR(ec_child));
1704                 }
1705
1706                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1707                 if (child == NULL) {
1708                         CERROR("Can not locate the child %s\n", lname->ln_name);
1709                         lu_object_put(env, ec_child);
1710                         GOTO(out_free, rc = -EINVAL);
1711                 }
1712
1713                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1714                        PFID(lu_object_fid(child)));
1715                 rc = mo_attr_get(env, lu2md(child), ma);
1716                 if (rc) {
1717                         CERROR("Can not getattr child "DFID": rc = %d\n",
1718                                 PFID(lu_object_fid(child)), rc);
1719                         lu_object_put(env, ec_child);
1720                         break;
1721                 }
1722                 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1723                        PFID(lu_object_fid(child)));
1724                 id++;
1725                 lu_object_put(env, ec_child);
1726         }
1727
1728 out_free:
1729         if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL)
1730                 OBD_FREE(ma->ma_lmm, ma->ma_lmm_size);
1731         if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL)
1732                 OBD_FREE(ma->ma_cookie, ma->ma_cookie_size);
1733         return rc;
1734 }
1735
1736 static int echo_lookup_object(const struct lu_env *env,
1737                               struct echo_device *ed,
1738                               struct lu_object *ec_parent,
1739                               __u64 id, int count)
1740 {
1741         struct lu_object        *parent;
1742         struct echo_thread_info *info = echo_env_info(env);
1743         struct lu_name          *lname = &info->eti_lname;
1744         char                    *name = info->eti_name;
1745         struct lu_fid           *fid = &info->eti_fid;
1746         struct lu_device        *ld = ed->ed_next;
1747         int                      rc = 0;
1748         int                      i;
1749
1750         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1751         if (ec_parent == NULL) {
1752                 lu_object_put(env, ec_parent);
1753                 return PTR_ERR(parent);
1754         }
1755
1756         /*prepare the requests*/
1757         for (i = 0; i < count; i++) {
1758                 echo_md_build_name(lname, name, id);
1759
1760                 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1761                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1762
1763                 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1764                 if (rc) {
1765                         CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1766                         break;
1767                 }
1768                 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1769                        PFID(lu_object_fid(parent)), lname->ln_name, parent);
1770
1771                 id++;
1772         }
1773         return rc;
1774 }
1775
1776 static int echo_md_destroy_internal(const struct lu_env *env,
1777                                     struct echo_device *ed,
1778                                     struct md_object *parent,
1779                                     struct lu_name *lname,
1780                                     struct md_attr *ma)
1781 {
1782         struct lu_device   *ld = ed->ed_next;
1783         struct lu_object   *ec_child;
1784         struct lu_object   *child;
1785         int                 rc;
1786
1787         ec_child = echo_md_lookup(env, ed, parent, lname);
1788         if (IS_ERR(ec_child)) {
1789                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1790                         PTR_ERR(ec_child));
1791                 RETURN(PTR_ERR(ec_child));
1792         }
1793
1794         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1795         if (child == NULL) {
1796                 CERROR("Can not locate the child %s\n", lname->ln_name);
1797                 GOTO(out_put, rc = -EINVAL);
1798         }
1799
1800         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1801                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1802
1803 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,3,50,0)
1804         /* After 2.4, MDT will send destroy RPC to OST directly, so no need
1805          * this flag */
1806         ma->ma_valid |= MA_FLAGS;
1807         ma->ma_attr_flags |= MDS_UNLINK_DESTROY;
1808 #else
1809 #warning "Please remove this after 2.4 (LOD/OSP)"
1810 #endif
1811         rc = mdo_unlink(env, parent, lu2md(child), lname, ma);
1812         if (rc) {
1813                 CERROR("Can not unlink child %s: rc = %d\n",
1814                         lname->ln_name, rc);
1815                 GOTO(out_put, rc);
1816         }
1817         CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1818                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1819 out_put:
1820         lu_object_put(env, ec_child);
1821         return rc;
1822 }
1823
1824 static int echo_destroy_object(const struct lu_env *env,
1825                                struct echo_device *ed,
1826                                struct lu_object *ec_parent,
1827                                char *name, int namelen,
1828                                __u64 id, __u32 mode,
1829                                int count)
1830 {
1831         struct echo_thread_info *info = echo_env_info(env);
1832         struct lu_name          *lname = &info->eti_lname;
1833         struct md_attr          *ma = &info->eti_ma;
1834         struct lu_device        *ld = ed->ed_next;
1835         struct lu_object        *parent;
1836         int                      rc = 0;
1837         int                      i;
1838         ENTRY;
1839
1840         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1841         if (parent == NULL)
1842                 RETURN(-EINVAL);
1843
1844         memset(ma, 0, sizeof(*ma));
1845         ma->ma_attr.la_mode = mode;
1846         ma->ma_attr.la_valid = LA_CTIME;
1847         ma->ma_attr.la_ctime = cfs_time_current_64();
1848         ma->ma_need = MA_INODE;
1849         ma->ma_valid = 0;
1850
1851         rc = echo_set_lmm_size(env, ld, ma);
1852         if (rc)
1853                 GOTO(out_free, rc);
1854         if (name != NULL) {
1855                 lname->ln_name = name;
1856                 lname->ln_namelen = namelen;
1857                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1858                                               ma);
1859                 GOTO(out_free, rc);
1860         }
1861
1862         /*prepare the requests*/
1863         for (i = 0; i < count; i++) {
1864                 char *tmp_name = info->eti_name;
1865
1866                 ma->ma_need |= MA_LOV;
1867                 ma->ma_valid = 0;
1868                 echo_md_build_name(lname, tmp_name, id);
1869
1870                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1871                                               ma);
1872                 if (rc) {
1873                         CERROR("Can not unlink child %s: rc = %d\n", name, rc);
1874                         break;
1875                 }
1876                 id++;
1877         }
1878
1879 out_free:
1880         if (ma->ma_lmm_size > 0 && ma->ma_lmm != NULL)
1881                 OBD_FREE(ma->ma_lmm, ma->ma_lmm_size);
1882         if (ma->ma_cookie_size > 0 && ma->ma_cookie != NULL)
1883                 OBD_FREE(ma->ma_cookie, ma->ma_cookie_size);
1884         RETURN(rc);
1885 }
1886
1887 struct lu_object *echo_resolve_path(const struct lu_env *env,
1888                                     struct echo_device *ed, char *path,
1889                                     int path_len)
1890 {
1891         struct lu_device        *ld = ed->ed_next;
1892         struct md_device        *md = lu2md_dev(ld);
1893         struct echo_thread_info *info = echo_env_info(env);
1894         struct lu_fid           *fid = &info->eti_fid;
1895         struct lu_name          *lname = &info->eti_lname;
1896         struct lu_object        *parent = NULL;
1897         struct lu_object        *child = NULL;
1898         int rc = 0;
1899         ENTRY;
1900
1901         /*Only support MDD layer right now*/
1902         rc = md->md_ops->mdo_root_get(env, md, fid);
1903         if (rc) {
1904                 CERROR("get root error: rc = %d\n", rc);
1905                 RETURN(ERR_PTR(rc));
1906         }
1907
1908         parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1909         if (IS_ERR(parent)) {
1910                 CERROR("Can not find the parent "DFID": rc = %ld\n",
1911                         PFID(fid), PTR_ERR(parent));
1912                 RETURN(parent);
1913         }
1914
1915         while (1) {
1916                 struct lu_object *ld_parent;
1917                 char *e;
1918
1919                 e = strsep(&path, "/");
1920                 if (e == NULL)
1921                         break;
1922
1923                 if (e[0] == 0) {
1924                         if (!path || path[0] == '\0')
1925                                 break;
1926                         continue;
1927                 }
1928
1929                 lname->ln_name = e;
1930                 lname->ln_namelen = strlen(e);
1931
1932                 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
1933                 if (ld_parent == NULL) {
1934                         lu_object_put(env, parent);
1935                         rc = -EINVAL;
1936                         break;
1937                 }
1938
1939                 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
1940                 lu_object_put(env, parent);
1941                 if (IS_ERR(child)) {
1942                         rc = (int)PTR_ERR(child);
1943                         CERROR("lookup %s under parent "DFID": rc = %d\n",
1944                                 lname->ln_name, PFID(lu_object_fid(ld_parent)),
1945                                 rc);
1946                         break;
1947                 }
1948                 parent = child;
1949         }
1950         if (rc)
1951                 RETURN(ERR_PTR(rc));
1952
1953         RETURN(parent);
1954 }
1955
1956 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_NOREF | LCT_MD_THREAD)
1957 #define ECHO_MD_SES_TAG (LCT_SESSION | LCT_REMEMBER | LCT_NOREF)
1958
1959 static int echo_md_handler(struct echo_device *ed, int command,
1960                            char *path, int path_len, int id, int count,
1961                            struct obd_ioctl_data *data)
1962 {
1963         struct lu_device      *ld = ed->ed_next;
1964         struct lu_env         *env;
1965         int                    refcheck;
1966         struct lu_object      *parent;
1967         char                  *name = NULL;
1968         int                    namelen = data->ioc_plen2;
1969         int                    rc = 0;
1970         ENTRY;
1971
1972         if (ld == NULL) {
1973                 CERROR("MD echo client is not being initialized properly\n");
1974                 RETURN(-EINVAL);
1975         }
1976
1977         if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1978                 CERROR("Only support MDD layer right now!\n");
1979                 RETURN(-EINVAL);
1980         }
1981
1982         env = cl_env_get(&refcheck);
1983         if (IS_ERR(env))
1984                 RETURN(PTR_ERR(env));
1985
1986         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
1987         if (rc != 0) {
1988                 cl_env_put(env, &refcheck);
1989                 RETURN(rc);
1990         }
1991
1992         parent = echo_resolve_path(env, ed, path, path_len);
1993         if (IS_ERR(parent)) {
1994                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
1995                         PTR_ERR(parent));
1996                 cl_env_put(env, &refcheck);
1997                 RETURN(PTR_ERR(parent));
1998         }
1999
2000         if (namelen > 0) {
2001                 OBD_ALLOC(name, namelen + 1);
2002                 if (name == NULL)
2003                         RETURN(-ENOMEM);
2004                 if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen)) {
2005                         OBD_FREE(name, namelen + 1);
2006                         RETURN(-EFAULT);
2007                 }
2008         }
2009
2010         switch (command) {
2011         case ECHO_MD_CREATE:
2012         case ECHO_MD_MKDIR: {
2013                 struct echo_thread_info *info = echo_env_info(env);
2014                 __u32 mode = data->ioc_obdo2.o_mode;
2015                 struct lu_fid *fid = &info->eti_fid;
2016                 int stripe_count = (int)data->ioc_obdo2.o_misc;
2017                 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2018
2019                 fid->f_seq = data->ioc_obdo1.o_seq;
2020                 fid->f_oid = (__u32)data->ioc_obdo1.o_id;
2021                 fid->f_ver = 0;
2022                 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2023                                            id, mode, count, stripe_count,
2024                                            stripe_index);
2025                 break;
2026         }
2027         case ECHO_MD_DESTROY:
2028         case ECHO_MD_RMDIR: {
2029                 __u32 mode = data->ioc_obdo2.o_mode;
2030
2031                 rc = echo_destroy_object(env, ed, parent, name, namelen,
2032                                          id, mode, count);
2033                 break;
2034         }
2035         case ECHO_MD_LOOKUP:
2036                 rc = echo_lookup_object(env, ed, parent, id, count);
2037                 break;
2038         case ECHO_MD_GETATTR:
2039                 rc = echo_getattr_object(env, ed, parent, id, count);
2040                 break;
2041         case ECHO_MD_SETATTR:
2042                 rc = echo_setattr_object(env, ed, parent, id, count);
2043                 break;
2044         default:
2045                 CERROR("unknown command %d\n", command);
2046                 rc = -EINVAL;
2047                 break;
2048         }
2049         if (name != NULL)
2050                 OBD_FREE(name, namelen + 1);
2051         lu_object_put(env, parent);
2052         cl_env_put(env, &refcheck);
2053         return rc;
2054 }
2055
2056 static int echo_create_object(struct echo_device *ed, int on_target,
2057                               struct obdo *oa, void *ulsm, int ulsm_nob,
2058                               struct obd_trans_info *oti)
2059 {
2060         struct echo_object     *eco;
2061         struct echo_client_obd *ec = ed->ed_ec;
2062         struct lov_stripe_md   *lsm = NULL;
2063         int                     rc;
2064         int                     created = 0;
2065         ENTRY;
2066
2067         if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
2068             (on_target ||                       /* set_stripe */
2069              ec->ec_nstripes != 0)) {           /* LOV */
2070                 CERROR ("No valid oid\n");
2071                 RETURN(-EINVAL);
2072         }
2073
2074         rc = obd_alloc_memmd(ec->ec_exp, &lsm);
2075         if (rc < 0) {
2076                 CERROR("Cannot allocate md: rc = %d\n", rc);
2077                 GOTO(failed, rc);
2078         }
2079
2080         if (ulsm != NULL) {
2081                 int i, idx;
2082
2083                 rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob);
2084                 if (rc != 0)
2085                         GOTO(failed, rc);
2086
2087                 if (lsm->lsm_stripe_count == 0)
2088                         lsm->lsm_stripe_count = ec->ec_nstripes;
2089
2090                 if (lsm->lsm_stripe_size == 0)
2091                         lsm->lsm_stripe_size = CFS_PAGE_SIZE;
2092
2093                 idx = cfs_rand();
2094
2095                 /* setup stripes: indices + default ids if required */
2096                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2097                         if (lsm->lsm_oinfo[i]->loi_id == 0)
2098                                 lsm->lsm_oinfo[i]->loi_id = lsm->lsm_object_id;
2099
2100                         lsm->lsm_oinfo[i]->loi_ost_idx =
2101                                 (idx + i) % ec->ec_nstripes;
2102                 }
2103         }
2104
2105         /* setup object ID here for !on_target and LOV hint */
2106         if (oa->o_valid & OBD_MD_FLID)
2107                 lsm->lsm_object_id = oa->o_id;
2108
2109         if (lsm->lsm_object_id == 0)
2110                 lsm->lsm_object_id = ++last_object_id;
2111
2112         rc = 0;
2113         if (on_target) {
2114                 /* Only echo objects are allowed to be created */
2115                 LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
2116                         (oa->o_seq == FID_SEQ_ECHO));
2117                 rc = obd_create(ec->ec_exp, oa, &lsm, oti);
2118                 if (rc != 0) {
2119                         CERROR("Cannot create objects: rc = %d\n", rc);
2120                         GOTO(failed, rc);
2121                 }
2122                 created = 1;
2123         }
2124
2125         /* See what object ID we were given */
2126         oa->o_id = lsm->lsm_object_id;
2127         oa->o_valid |= OBD_MD_FLID;
2128
2129         eco = cl_echo_object_find(ed, &lsm);
2130         if (IS_ERR(eco))
2131                 GOTO(failed, rc = PTR_ERR(eco));
2132         cl_echo_object_put(eco);
2133
2134         CDEBUG(D_INFO, "oa->o_id = %lx\n", (long)oa->o_id);
2135         EXIT;
2136
2137  failed:
2138         if (created && rc)
2139                 obd_destroy(ec->ec_exp, oa, lsm, oti, NULL, NULL);
2140         if (lsm)
2141                 obd_free_memmd(ec->ec_exp, &lsm);
2142         if (rc)
2143                 CERROR("create object failed with: rc = %d\n", rc);
2144         return (rc);
2145 }
2146
2147 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2148                            struct obdo *oa)
2149 {
2150         struct echo_client_obd *ec  = ed->ed_ec;
2151         struct lov_stripe_md   *lsm = NULL;
2152         struct echo_object     *eco;
2153         int                     rc;
2154         ENTRY;
2155
2156         if ((oa->o_valid & OBD_MD_FLID) == 0 ||
2157             oa->o_id == 0)  /* disallow use of object id 0 */
2158         {
2159                 CERROR ("No valid oid\n");
2160                 RETURN(-EINVAL);
2161         }
2162
2163         rc = obd_alloc_memmd(ec->ec_exp, &lsm);
2164         if (rc < 0)
2165                 RETURN(rc);
2166
2167         lsm->lsm_object_id = oa->o_id;
2168         if (oa->o_valid & OBD_MD_FLGROUP)
2169                 lsm->lsm_object_seq = oa->o_seq;
2170         else
2171                 lsm->lsm_object_seq = FID_SEQ_ECHO;
2172
2173         rc = 0;
2174         eco = cl_echo_object_find(ed, &lsm);
2175         if (!IS_ERR(eco))
2176                 *ecop = eco;
2177         else
2178                 rc = PTR_ERR(eco);
2179         if (lsm)
2180                 obd_free_memmd(ec->ec_exp, &lsm);
2181         RETURN(rc);
2182 }
2183
2184 static void echo_put_object(struct echo_object *eco)
2185 {
2186         if (cl_echo_object_put(eco))
2187                 CERROR("echo client: drop an object failed");
2188 }
2189
2190 static void
2191 echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
2192 {
2193         unsigned long stripe_count;
2194         unsigned long stripe_size;
2195         unsigned long width;
2196         unsigned long woffset;
2197         int           stripe_index;
2198         obd_off       offset;
2199
2200         if (lsm->lsm_stripe_count <= 1)
2201                 return;
2202
2203         offset       = *offp;
2204         stripe_size  = lsm->lsm_stripe_size;
2205         stripe_count = lsm->lsm_stripe_count;
2206
2207         /* width = # bytes in all stripes */
2208         width = stripe_size * stripe_count;
2209
2210         /* woffset = offset within a width; offset = whole number of widths */
2211         woffset = do_div (offset, width);
2212
2213         stripe_index = woffset / stripe_size;
2214
2215         *idp = lsm->lsm_oinfo[stripe_index]->loi_id;
2216         *offp = offset * stripe_size + woffset % stripe_size;
2217 }
2218
2219 static void
2220 echo_client_page_debug_setup(struct lov_stripe_md *lsm,
2221                              cfs_page_t *page, int rw, obd_id id,
2222                              obd_off offset, obd_off count)
2223 {
2224         char    *addr;
2225         obd_off  stripe_off;
2226         obd_id   stripe_id;
2227         int      delta;
2228
2229         /* no partial pages on the client */
2230         LASSERT(count == CFS_PAGE_SIZE);
2231
2232         addr = cfs_kmap(page);
2233
2234         for (delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2235                 if (rw == OBD_BRW_WRITE) {
2236                         stripe_off = offset + delta;
2237                         stripe_id = id;
2238                         echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
2239                 } else {
2240                         stripe_off = 0xdeadbeef00c0ffeeULL;
2241                         stripe_id = 0xdeadbeef00c0ffeeULL;
2242                 }
2243                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2244                                   stripe_off, stripe_id);
2245         }
2246
2247         cfs_kunmap(page);
2248 }
2249
2250 static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
2251                                         cfs_page_t *page, obd_id id,
2252                                         obd_off offset, obd_off count)
2253 {
2254         obd_off stripe_off;
2255         obd_id  stripe_id;
2256         char   *addr;
2257         int     delta;
2258         int     rc;
2259         int     rc2;
2260
2261         /* no partial pages on the client */
2262         LASSERT(count == CFS_PAGE_SIZE);
2263
2264         addr = cfs_kmap(page);
2265
2266         for (rc = delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2267                 stripe_off = offset + delta;
2268                 stripe_id = id;
2269                 echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
2270
2271                 rc2 = block_debug_check("test_brw",
2272                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
2273                                         stripe_off, stripe_id);
2274                 if (rc2 != 0) {
2275                         CERROR ("Error in echo object "LPX64"\n", id);
2276                         rc = rc2;
2277                 }
2278         }
2279
2280         cfs_kunmap(page);
2281         return rc;
2282 }
2283
2284 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2285                             struct echo_object *eco, obd_off offset,
2286                             obd_size count, int async,
2287                             struct obd_trans_info *oti)
2288 {
2289         struct lov_stripe_md   *lsm = eco->eo_lsm;
2290         obd_count               npages;
2291         struct brw_page        *pga;
2292         struct brw_page        *pgp;
2293         cfs_page_t            **pages;
2294         obd_off                 off;
2295         int                     i;
2296         int                     rc;
2297         int                     verify;
2298         int                     gfp_mask;
2299         int                     brw_flags = 0;
2300         ENTRY;
2301
2302         verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
2303                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2304                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2305
2306         gfp_mask = ((oa->o_id & 2) == 0) ? CFS_ALLOC_STD : CFS_ALLOC_HIGHUSER;
2307
2308         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2309         LASSERT(lsm != NULL);
2310         LASSERT(lsm->lsm_object_id == oa->o_id);
2311
2312         if (count <= 0 ||
2313             (count & (~CFS_PAGE_MASK)) != 0)
2314                 RETURN(-EINVAL);
2315
2316         /* XXX think again with misaligned I/O */
2317         npages = count >> CFS_PAGE_SHIFT;
2318
2319         if (rw == OBD_BRW_WRITE)
2320                 brw_flags = OBD_BRW_ASYNC;
2321
2322         OBD_ALLOC(pga, npages * sizeof(*pga));
2323         if (pga == NULL)
2324                 RETURN(-ENOMEM);
2325
2326         OBD_ALLOC(pages, npages * sizeof(*pages));
2327         if (pages == NULL) {
2328                 OBD_FREE(pga, npages * sizeof(*pga));
2329                 RETURN(-ENOMEM);
2330         }
2331
2332         for (i = 0, pgp = pga, off = offset;
2333              i < npages;
2334              i++, pgp++, off += CFS_PAGE_SIZE) {
2335
2336                 LASSERT (pgp->pg == NULL);      /* for cleanup */
2337
2338                 rc = -ENOMEM;
2339                 OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
2340                 if (pgp->pg == NULL)
2341                         goto out;
2342
2343                 pages[i] = pgp->pg;
2344                 pgp->count = CFS_PAGE_SIZE;
2345                 pgp->off = off;
2346                 pgp->flag = brw_flags;
2347
2348                 if (verify)
2349                         echo_client_page_debug_setup(lsm, pgp->pg, rw,
2350                                                      oa->o_id, off, pgp->count);
2351         }
2352
2353         /* brw mode can only be used at client */
2354         LASSERT(ed->ed_next != NULL);
2355         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2356
2357  out:
2358         if (rc != 0 || rw != OBD_BRW_READ)
2359                 verify = 0;
2360
2361         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2362                 if (pgp->pg == NULL)
2363                         continue;
2364
2365                 if (verify) {
2366                         int vrc;
2367                         vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,
2368                                                            pgp->off, pgp->count);
2369                         if (vrc != 0 && rc == 0)
2370                                 rc = vrc;
2371                 }
2372                 OBD_PAGE_FREE(pgp->pg);
2373         }
2374         OBD_FREE(pga, npages * sizeof(*pga));
2375         OBD_FREE(pages, npages * sizeof(*pages));
2376         RETURN(rc);
2377 }
2378
2379 static int echo_client_prep_commit(struct obd_export *exp, int rw,
2380                                    struct obdo *oa, struct echo_object *eco,
2381                                    obd_off offset, obd_size count,
2382                                    obd_size batch, struct obd_trans_info *oti,
2383                                    int async)
2384 {
2385         struct lov_stripe_md *lsm = eco->eo_lsm;
2386         struct obd_ioobj ioo;
2387         struct niobuf_local *lnb;
2388         struct niobuf_remote *rnb;
2389         obd_off off;
2390         obd_size npages, tot_pages;
2391         int i, ret = 0;
2392         ENTRY;
2393
2394         if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
2395             (lsm != NULL && lsm->lsm_object_id != oa->o_id))
2396                 RETURN(-EINVAL);
2397
2398         npages = batch >> CFS_PAGE_SHIFT;
2399         tot_pages = count >> CFS_PAGE_SHIFT;
2400
2401         OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
2402         OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
2403
2404         if (lnb == NULL || rnb == NULL)
2405                 GOTO(out, ret = -ENOMEM);
2406
2407         obdo_to_ioobj(oa, &ioo);
2408
2409         off = offset;
2410
2411         for(; tot_pages; tot_pages -= npages) {
2412                 int lpages;
2413
2414                 if (tot_pages < npages)
2415                         npages = tot_pages;
2416
2417                 for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) {
2418                         rnb[i].offset = off;
2419                         rnb[i].len = CFS_PAGE_SIZE;
2420                 }
2421
2422                 ioo.ioo_bufcnt = npages;
2423                 oti->oti_transno = 0;
2424
2425                 lpages = npages;
2426                 ret = obd_preprw(rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti,
2427                                  NULL);
2428                 if (ret != 0)
2429                         GOTO(out, ret);
2430                 LASSERT(lpages == npages);
2431
2432                 for (i = 0; i < lpages; i++) {
2433                         cfs_page_t *page = lnb[i].page;
2434
2435                         /* read past eof? */
2436                         if (page == NULL && lnb[i].rc == 0)
2437                                 continue;
2438
2439                         if (async)
2440                                 lnb[i].flags |= OBD_BRW_ASYNC;
2441
2442                         if (oa->o_id == ECHO_PERSISTENT_OBJID ||
2443                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2444                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2445                                 continue;
2446
2447                         if (rw == OBD_BRW_WRITE)
2448                                 echo_client_page_debug_setup(lsm, page, rw,
2449                                                              oa->o_id,
2450                                                              rnb[i].offset,
2451                                                              rnb[i].len);
2452                         else
2453                                 echo_client_page_debug_check(lsm, page,
2454                                                              oa->o_id,
2455                                                              rnb[i].offset,
2456                                                              rnb[i].len);
2457                 }
2458
2459                 ret = obd_commitrw(rw, exp, oa, 1,&ioo,rnb,npages,lnb,oti,ret);
2460                 if (ret != 0)
2461                         GOTO(out, ret);
2462
2463                 /* Reset oti otherwise it would confuse ldiskfs. */
2464                 memset(oti, 0, sizeof(*oti));
2465         }
2466
2467 out:
2468         if (lnb)
2469                 OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
2470         if (rnb)
2471                 OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
2472         RETURN(ret);
2473 }
2474
2475 static int echo_client_brw_ioctl(int rw, struct obd_export *exp,
2476                                  struct obd_ioctl_data *data)
2477 {
2478         struct obd_device *obd = class_exp2obd(exp);
2479         struct echo_device *ed = obd2echo_dev(obd);
2480         struct echo_client_obd *ec = ed->ed_ec;
2481         struct obd_trans_info dummy_oti = { 0 };
2482         struct obdo *oa = &data->ioc_obdo1;
2483         struct echo_object *eco;
2484         int rc;
2485         int async = 1;
2486         long test_mode;
2487         ENTRY;
2488
2489         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2490
2491         rc = echo_get_object(&eco, ed, oa);
2492         if (rc)
2493                 RETURN(rc);
2494
2495         oa->o_valid &= ~OBD_MD_FLHANDLE;
2496
2497         /* obdfilter doesn't support obd_brw now, simulate via prep + commit */
2498         test_mode = (long)data->ioc_pbuf1;
2499         if (test_mode == 1)
2500                 async = 0;
2501
2502         if (ed->ed_next == NULL && test_mode != 3) {
2503                 test_mode = 3;
2504                 data->ioc_plen1 = data->ioc_count;
2505         }
2506
2507         /* Truncate batch size to maximum */
2508         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2509                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2510
2511         switch (test_mode) {
2512         case 1:
2513                 /* fall through */
2514         case 2:
2515                 rc = echo_client_kbrw(ed, rw, oa,
2516                                       eco, data->ioc_offset,
2517                                       data->ioc_count, async, &dummy_oti);
2518                 break;
2519         case 3:
2520                 rc = echo_client_prep_commit(ec->ec_exp, rw, oa,
2521                                             eco, data->ioc_offset,
2522                                             data->ioc_count, data->ioc_plen1,
2523                                             &dummy_oti, async);
2524                 break;
2525         default:
2526                 rc = -EINVAL;
2527         }
2528         echo_put_object(eco);
2529         RETURN(rc);
2530 }
2531
2532 static int
2533 echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
2534                     int mode, obd_off offset, obd_size nob)
2535 {
2536         struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
2537         struct lustre_handle   *ulh = &oa->o_handle;
2538         struct echo_object     *eco;
2539         obd_off                 end;
2540         int                     rc;
2541         ENTRY;
2542
2543         if (ed->ed_next == NULL)
2544                 RETURN(-EOPNOTSUPP);
2545
2546         if (!(mode == LCK_PR || mode == LCK_PW))
2547                 RETURN(-EINVAL);
2548
2549         if ((offset & (~CFS_PAGE_MASK)) != 0 ||
2550             (nob & (~CFS_PAGE_MASK)) != 0)
2551                 RETURN(-EINVAL);
2552
2553         rc = echo_get_object (&eco, ed, oa);
2554         if (rc != 0)
2555                 RETURN(rc);
2556
2557         end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
2558         rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
2559         if (rc == 0) {
2560                 oa->o_valid |= OBD_MD_FLHANDLE;
2561                 CDEBUG(D_INFO, "Cookie is "LPX64"\n", ulh->cookie);
2562         }
2563         echo_put_object(eco);
2564         RETURN(rc);
2565 }
2566
2567 static int
2568 echo_client_cancel(struct obd_export *exp, struct obdo *oa)
2569 {
2570         struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
2571         __u64               cookie = oa->o_handle.cookie;
2572
2573         if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
2574                 return -EINVAL;
2575
2576         CDEBUG(D_INFO, "Cookie is "LPX64"\n", cookie);
2577         return cl_echo_cancel(ed, cookie);
2578 }
2579
2580 static int
2581 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
2582                       int len, void *karg, void *uarg)
2583 {
2584         struct obd_device      *obd = exp->exp_obd;
2585         struct echo_device     *ed = obd2echo_dev(obd);
2586         struct echo_client_obd *ec = ed->ed_ec;
2587         struct echo_object     *eco;
2588         struct obd_ioctl_data  *data = karg;
2589         struct obd_trans_info   dummy_oti;
2590         struct oti_req_ack_lock *ack_lock;
2591         struct obdo            *oa;
2592         struct lu_fid           fid;
2593         int                     rw = OBD_BRW_READ;
2594         int                     rc = 0;
2595         int                     i;
2596         ENTRY;
2597
2598 #ifndef HAVE_UNLOCKED_IOCTL
2599         cfs_unlock_kernel();
2600 #endif
2601
2602         memset(&dummy_oti, 0, sizeof(dummy_oti));
2603
2604         oa = &data->ioc_obdo1;
2605         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2606                 oa->o_valid |= OBD_MD_FLGROUP;
2607                 oa->o_seq = FID_SEQ_ECHO;
2608         }
2609
2610         /* This FID is unpacked just for validation at this point */
2611         rc = fid_ostid_unpack(&fid, &oa->o_oi, 0);
2612         if (rc < 0)
2613                 RETURN(rc);
2614
2615         switch (cmd) {
2616         case OBD_IOC_CREATE:                    /* may create echo object */
2617                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2618                         GOTO (out, rc = -EPERM);
2619
2620                 rc = echo_create_object (ed, 1, oa,
2621                                          data->ioc_pbuf1, data->ioc_plen1,
2622                                          &dummy_oti);
2623                 GOTO(out, rc);
2624
2625         case OBD_IOC_ECHO_MD: {
2626                 int count;
2627                 int cmd;
2628                 char *dir = NULL;
2629                 int dirlen;
2630                 __u64 id;
2631
2632                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2633                         GOTO(out, rc = -EPERM);
2634
2635                 count = data->ioc_count;
2636                 cmd = data->ioc_command;
2637
2638                 id = data->ioc_obdo2.o_id;
2639
2640                 dirlen = data->ioc_plen1;
2641                 OBD_ALLOC(dir, dirlen + 1);
2642                 if (dir == NULL)
2643                         GOTO(out, rc = -ENOMEM);
2644
2645                 if (cfs_copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2646                         OBD_FREE(dir, data->ioc_plen1 + 1);
2647                         GOTO(out, rc = -EFAULT);
2648                 }
2649
2650                 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2651                 OBD_FREE(dir, dirlen + 1);
2652                 GOTO(out, rc);
2653         }
2654         case OBD_IOC_ECHO_ALLOC_SEQ: {
2655                 struct lu_env   *env;
2656                 int              refcheck;
2657                 __u64            seq;
2658                 int              max_count;
2659
2660                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2661                         GOTO(out, rc = -EPERM);
2662
2663                 env = cl_env_get(&refcheck);
2664                 if (IS_ERR(env))
2665                         GOTO(out, rc = PTR_ERR(env));
2666
2667                 rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG,
2668                                             ECHO_MD_SES_TAG);
2669                 if (rc != 0) {
2670                         cl_env_put(env, &refcheck);
2671                         GOTO(out, rc);
2672                 }
2673
2674                 rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
2675                 cl_env_put(env, &refcheck);
2676                 if (rc < 0) {
2677                         CERROR("%s: Can not alloc seq: rc = %d\n",
2678                                obd->obd_name, rc);
2679                         GOTO(out, rc);
2680                 }
2681
2682                 if (cfs_copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2683                         return -EFAULT;
2684
2685                 max_count = LUSTRE_SEQ_MAX_WIDTH;
2686                 if (cfs_copy_to_user(data->ioc_pbuf2, &max_count,
2687                                      data->ioc_plen2))
2688                         return -EFAULT;
2689                 GOTO(out, rc);
2690         }
2691         case OBD_IOC_DESTROY:
2692                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2693                         GOTO (out, rc = -EPERM);
2694
2695                 rc = echo_get_object (&eco, ed, oa);
2696                 if (rc == 0) {
2697                         rc = obd_destroy(ec->ec_exp, oa, eco->eo_lsm,
2698                                          &dummy_oti, NULL, NULL);
2699                         if (rc == 0)
2700                                 eco->eo_deleted = 1;
2701                         echo_put_object(eco);
2702                 }
2703                 GOTO(out, rc);
2704
2705         case OBD_IOC_GETATTR:
2706                 rc = echo_get_object (&eco, ed, oa);
2707                 if (rc == 0) {
2708                         struct obd_info oinfo = { { { 0 } } };
2709                         oinfo.oi_md = eco->eo_lsm;
2710                         oinfo.oi_oa = oa;
2711                         rc = obd_getattr(ec->ec_exp, &oinfo);
2712                         echo_put_object(eco);
2713                 }
2714                 GOTO(out, rc);
2715
2716         case OBD_IOC_SETATTR:
2717                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2718                         GOTO (out, rc = -EPERM);
2719
2720                 rc = echo_get_object (&eco, ed, oa);
2721                 if (rc == 0) {
2722                         struct obd_info oinfo = { { { 0 } } };
2723                         oinfo.oi_oa = oa;
2724                         oinfo.oi_md = eco->eo_lsm;
2725
2726                         rc = obd_setattr(ec->ec_exp, &oinfo, NULL);
2727                         echo_put_object(eco);
2728                 }
2729                 GOTO(out, rc);
2730
2731         case OBD_IOC_BRW_WRITE:
2732                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2733                         GOTO (out, rc = -EPERM);
2734
2735                 rw = OBD_BRW_WRITE;
2736                 /* fall through */
2737         case OBD_IOC_BRW_READ:
2738                 rc = echo_client_brw_ioctl(rw, exp, data);
2739                 GOTO(out, rc);
2740
2741         case ECHO_IOC_GET_STRIPE:
2742                 rc = echo_get_object(&eco, ed, oa);
2743                 if (rc == 0) {
2744                         rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
2745                                               data->ioc_plen1);
2746                         echo_put_object(eco);
2747                 }
2748                 GOTO(out, rc);
2749
2750         case ECHO_IOC_SET_STRIPE:
2751                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2752                         GOTO (out, rc = -EPERM);
2753
2754                 if (data->ioc_pbuf1 == NULL) {  /* unset */
2755                         rc = echo_get_object(&eco, ed, oa);
2756                         if (rc == 0) {
2757                                 eco->eo_deleted = 1;
2758                                 echo_put_object(eco);
2759                         }
2760                 } else {
2761                         rc = echo_create_object(ed, 0, oa,
2762                                                 data->ioc_pbuf1,
2763                                                 data->ioc_plen1, &dummy_oti);
2764                 }
2765                 GOTO (out, rc);
2766
2767         case ECHO_IOC_ENQUEUE:
2768                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2769                         GOTO (out, rc = -EPERM);
2770
2771                 rc = echo_client_enqueue(exp, oa,
2772                                          data->ioc_conn1, /* lock mode */
2773                                          data->ioc_offset,
2774                                          data->ioc_count);/*extent*/
2775                 GOTO (out, rc);
2776
2777         case ECHO_IOC_CANCEL:
2778                 rc = echo_client_cancel(exp, oa);
2779                 GOTO (out, rc);
2780
2781         default:
2782                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2783                 GOTO (out, rc = -ENOTTY);
2784         }
2785
2786         EXIT;
2787  out:
2788
2789         /* XXX this should be in a helper also called by target_send_reply */
2790         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2791              i++, ack_lock++) {
2792                 if (!ack_lock->mode)
2793                         break;
2794                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2795         }
2796
2797 #ifndef HAVE_UNLOCKED_IOCTL
2798         cfs_lock_kernel();
2799 #endif
2800
2801         return rc;
2802 }
2803
2804 static int echo_client_setup(const struct lu_env *env,
2805                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2806 {
2807         struct echo_client_obd *ec = &obddev->u.echo_client;
2808         struct obd_device *tgt;
2809         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2810         struct obd_connect_data *ocd = NULL;
2811         int rc;
2812         ENTRY;
2813
2814         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2815                 CERROR("requires a TARGET OBD name\n");
2816                 RETURN(-EINVAL);
2817         }
2818
2819         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2820         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2821                 CERROR("device not attached or not set up (%s)\n",
2822                        lustre_cfg_string(lcfg, 1));
2823                 RETURN(-EINVAL);
2824         }
2825
2826         cfs_spin_lock_init (&ec->ec_lock);
2827         CFS_INIT_LIST_HEAD (&ec->ec_objects);
2828         CFS_INIT_LIST_HEAD (&ec->ec_locks);
2829         ec->ec_unique = 0;
2830         ec->ec_nstripes = 0;
2831
2832         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
2833                 lu_context_tags_update(ECHO_MD_CTX_TAG);
2834                 lu_session_tags_update(ECHO_MD_SES_TAG);
2835                 RETURN(0);
2836         }
2837
2838         OBD_ALLOC(ocd, sizeof(*ocd));
2839         if (ocd == NULL) {
2840                 CERROR("Can't alloc ocd connecting to %s\n",
2841                        lustre_cfg_string(lcfg, 1));
2842                 return -ENOMEM;
2843         }
2844
2845         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2846                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2847                                  OBD_CONNECT_64BITHASH;
2848         ocd->ocd_version = LUSTRE_VERSION_CODE;
2849         ocd->ocd_group = FID_SEQ_ECHO;
2850
2851         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2852         if (rc == 0) {
2853                 /* Turn off pinger because it connects to tgt obd directly. */
2854                 cfs_spin_lock(&tgt->obd_dev_lock);
2855                 cfs_list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2856                 cfs_spin_unlock(&tgt->obd_dev_lock);
2857         }
2858
2859         OBD_FREE(ocd, sizeof(*ocd));
2860
2861         if (rc != 0) {
2862                 CERROR("fail to connect to device %s\n",
2863                        lustre_cfg_string(lcfg, 1));
2864                 return (rc);
2865         }
2866
2867         RETURN(rc);
2868 }
2869
2870 static int echo_client_cleanup(struct obd_device *obddev)
2871 {
2872         struct echo_device *ed = obd2echo_dev(obddev);
2873         struct echo_client_obd *ec = &obddev->u.echo_client;
2874         int rc;
2875         ENTRY;
2876
2877         /*Do nothing for Metadata echo client*/
2878         if (ed == NULL )
2879                 RETURN(0);
2880
2881         if (ed->ed_next_ismd) {
2882                 lu_context_tags_clear(ECHO_MD_CTX_TAG);
2883                 lu_session_tags_clear(ECHO_MD_SES_TAG);
2884                 RETURN(0);
2885         }
2886
2887         if (!cfs_list_empty(&obddev->obd_exports)) {
2888                 CERROR("still has clients!\n");
2889                 RETURN(-EBUSY);
2890         }
2891
2892         LASSERT(cfs_atomic_read(&ec->ec_exp->exp_refcount) > 0);
2893         rc = obd_disconnect(ec->ec_exp);
2894         if (rc != 0)
2895                 CERROR("fail to disconnect device: %d\n", rc);
2896
2897         RETURN(rc);
2898 }
2899
2900 static int echo_client_connect(const struct lu_env *env,
2901                                struct obd_export **exp,
2902                                struct obd_device *src, struct obd_uuid *cluuid,
2903                                struct obd_connect_data *data, void *localdata)
2904 {
2905         int                rc;
2906         struct lustre_handle conn = { 0 };
2907
2908         ENTRY;
2909         rc = class_connect(&conn, src, cluuid);
2910         if (rc == 0) {
2911                 *exp = class_conn2export(&conn);
2912         }
2913
2914         RETURN (rc);
2915 }
2916
2917 static int echo_client_disconnect(struct obd_export *exp)
2918 {
2919 #if 0
2920         struct obd_device      *obd;
2921         struct echo_client_obd *ec;
2922         struct ec_lock         *ecl;
2923 #endif
2924         int                     rc;
2925         ENTRY;
2926
2927         if (exp == NULL)
2928                 GOTO(out, rc = -EINVAL);
2929
2930 #if 0
2931         obd = exp->exp_obd;
2932         ec = &obd->u.echo_client;
2933
2934         /* no more contention on export's lock list */
2935         while (!cfs_list_empty (&exp->exp_ec_data.eced_locks)) {
2936                 ecl = cfs_list_entry (exp->exp_ec_data.eced_locks.next,
2937                                       struct ec_lock, ecl_exp_chain);
2938                 cfs_list_del (&ecl->ecl_exp_chain);
2939
2940                 rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
2941                                  ecl->ecl_mode, &ecl->ecl_lock_handle);
2942
2943                 CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
2944                         "(%d)\n", ecl->ecl_object->eco_id, rc);
2945
2946                 echo_put_object (ecl->ecl_object);
2947                 OBD_FREE (ecl, sizeof (*ecl));
2948         }
2949 #endif
2950
2951         rc = class_disconnect(exp);
2952         GOTO(out, rc);
2953  out:
2954         return rc;
2955 }
2956
2957 static struct obd_ops echo_obd_ops = {
2958         .o_owner       = THIS_MODULE,
2959
2960 #if 0
2961         .o_setup       = echo_client_setup,
2962         .o_cleanup     = echo_client_cleanup,
2963 #endif
2964
2965         .o_iocontrol   = echo_client_iocontrol,
2966         .o_connect     = echo_client_connect,
2967         .o_disconnect  = echo_client_disconnect
2968 };
2969
2970 int echo_client_init(void)
2971 {
2972         struct lprocfs_static_vars lvars = { 0 };
2973         int rc;
2974
2975         lprocfs_echo_init_vars(&lvars);
2976
2977         rc = lu_kmem_init(echo_caches);
2978         if (rc == 0) {
2979                 rc = class_register_type(&echo_obd_ops, NULL,
2980                                          lvars.module_vars,
2981                                          LUSTRE_ECHO_CLIENT_NAME,
2982                                          &echo_device_type);
2983                 if (rc)
2984                         lu_kmem_fini(echo_caches);
2985         }
2986         return rc;
2987 }
2988
2989 void echo_client_exit(void)
2990 {
2991         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2992         lu_kmem_fini(echo_caches);
2993 }
2994
2995 /** @} echo_client */