Whamcloud - gitweb
LU-834 echo_client: fix page_is_vmlocked
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #define DEBUG_SUBSYSTEM S_ECHO
41 #ifdef __KERNEL__
42 #include <libcfs/libcfs.h>
43 #else
44 #include <liblustre.h>
45 #endif
46
47 #include <obd.h>
48 #include <obd_support.h>
49 #include <obd_class.h>
50 #include <lustre_debug.h>
51 #include <lprocfs_status.h>
52 #include <cl_object.h>
53
54 #include "echo_internal.h"
55
56 /** \defgroup echo_client Echo Client
57  * @{
58  */
59
60 struct echo_device {
61         struct cl_device        ed_cl;
62         struct echo_client_obd *ed_ec;
63
64         struct cl_site          ed_site_myself;
65         struct cl_site         *ed_site;
66         struct lu_device       *ed_next;
67         int                     ed_next_islov;
68 };
69
70 struct echo_object {
71         struct cl_object        eo_cl;
72         struct cl_object_header eo_hdr;
73
74         struct echo_device     *eo_dev;
75         cfs_list_t              eo_obj_chain;
76         struct lov_stripe_md   *eo_lsm;
77         cfs_atomic_t            eo_npages;
78         int                     eo_deleted;
79 };
80
81 struct echo_object_conf {
82         struct cl_object_conf  eoc_cl;
83         struct lov_stripe_md **eoc_md;
84 };
85
86 struct echo_page {
87         struct cl_page_slice   ep_cl;
88         cfs_mutex_t            ep_lock;
89         cfs_page_t            *ep_vmpage;
90 };
91
92 struct echo_lock {
93         struct cl_lock_slice   el_cl;
94         cfs_list_t             el_chain;
95         struct echo_object    *el_object;
96         __u64                  el_cookie;
97         cfs_atomic_t           el_refcount;
98 };
99
100 struct echo_io {
101         struct cl_io_slice     ei_cl;
102 };
103
104 #if 0
105 struct echo_req {
106         struct cl_req_slice er_cl;
107 };
108 #endif
109
110 static int echo_client_setup(struct obd_device *obddev,
111                              struct lustre_cfg *lcfg);
112 static int echo_client_cleanup(struct obd_device *obddev);
113
114
115 /** \defgroup echo_helpers Helper functions
116  * @{
117  */
118 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
119 {
120         return container_of0(dev, struct echo_device, ed_cl);
121 }
122
123 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
124 {
125         return &d->ed_cl;
126 }
127
128 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
129 {
130         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
131 }
132
133 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
134 {
135         return &eco->eo_cl;
136 }
137
138 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
139 {
140         return container_of(o, struct echo_object, eo_cl);
141 }
142
143 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
144 {
145         return container_of(s, struct echo_page, ep_cl);
146 }
147
148 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
149 {
150         return container_of(s, struct echo_lock, el_cl);
151 }
152
153 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
154 {
155         return ecl->el_cl.cls_lock;
156 }
157
158 static struct lu_context_key echo_thread_key;
159 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
160 {
161         struct echo_thread_info *info;
162         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
163         LASSERT(info != NULL);
164         return info;
165 }
166
167 static inline
168 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
169 {
170         return container_of(c, struct echo_object_conf, eoc_cl);
171 }
172
173 static inline void lsm2fid(struct lov_stripe_md *lsm, struct lu_fid *fid)
174 {
175         fid_zero(fid);
176         fid->f_seq = FID_SEQ_ECHO;
177         /* truncated to 32 bits by assignment */
178         fid->f_oid = lsm->lsm_object_id;
179         fid->f_ver = lsm->lsm_object_id >> 32;
180 }
181 /** @} echo_helpers */
182
183 static struct echo_object *cl_echo_object_find(struct echo_device *d,
184                                                struct lov_stripe_md **lsm);
185 static int cl_echo_object_put(struct echo_object *eco);
186 static int cl_echo_enqueue   (struct echo_object *eco, obd_off start,
187                               obd_off end, int mode, __u64 *cookie);
188 static int cl_echo_cancel    (struct echo_device *d, __u64 cookie);
189 static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
190                               cfs_page_t **pages, int npages, int async);
191
192 static struct echo_thread_info *echo_env_info(const struct lu_env *env);
193
194 struct echo_thread_info {
195         struct echo_object_conf eti_conf;
196         struct lustre_md        eti_md;
197
198         struct cl_2queue        eti_queue;
199         struct cl_io            eti_io;
200         struct cl_lock_descr    eti_descr;
201         struct lu_fid           eti_fid;
202 };
203
204 /* No session used right now */
205 struct echo_session_info {
206         unsigned long dummy;
207 };
208
209 static cfs_mem_cache_t *echo_page_kmem;
210 static cfs_mem_cache_t *echo_lock_kmem;
211 static cfs_mem_cache_t *echo_object_kmem;
212 static cfs_mem_cache_t *echo_thread_kmem;
213 static cfs_mem_cache_t *echo_session_kmem;
214 //static cfs_mem_cache_t *echo_req_kmem;
215
216 static struct lu_kmem_descr echo_caches[] = {
217         {
218                 .ckd_cache = &echo_page_kmem,
219                 .ckd_name  = "echo_page_kmem",
220                 .ckd_size  = sizeof (struct echo_page)
221         },
222         {
223                 .ckd_cache = &echo_lock_kmem,
224                 .ckd_name  = "echo_lock_kmem",
225                 .ckd_size  = sizeof (struct echo_lock)
226         },
227         {
228                 .ckd_cache = &echo_object_kmem,
229                 .ckd_name  = "echo_object_kmem",
230                 .ckd_size  = sizeof (struct echo_object)
231         },
232         {
233                 .ckd_cache = &echo_thread_kmem,
234                 .ckd_name  = "echo_thread_kmem",
235                 .ckd_size  = sizeof (struct echo_thread_info)
236         },
237         {
238                 .ckd_cache = &echo_session_kmem,
239                 .ckd_name  = "echo_session_kmem",
240                 .ckd_size  = sizeof (struct echo_session_info)
241         },
242 #if 0
243         {
244                 .ckd_cache = &echo_req_kmem,
245                 .ckd_name  = "echo_req_kmem",
246                 .ckd_size  = sizeof (struct echo_req)
247         },
248 #endif
249         {
250                 .ckd_cache = NULL
251         }
252 };
253
254 /** \defgroup echo_page Page operations
255  *
256  * Echo page operations.
257  *
258  * @{
259  */
260 cfs_page_t *echo_page_vmpage(const struct lu_env *env,
261                              const struct cl_page_slice *slice)
262 {
263         return cl2echo_page(slice)->ep_vmpage;
264 }
265
266 static int echo_page_own(const struct lu_env *env,
267                          const struct cl_page_slice *slice,
268                          struct cl_io *io, int nonblock)
269 {
270         struct echo_page *ep = cl2echo_page(slice);
271
272         if (!nonblock)
273                 cfs_mutex_lock(&ep->ep_lock);
274         else if (!cfs_mutex_trylock(&ep->ep_lock))
275                 return -EAGAIN;
276         return 0;
277 }
278
279 static void echo_page_disown(const struct lu_env *env,
280                              const struct cl_page_slice *slice,
281                              struct cl_io *io)
282 {
283         struct echo_page *ep = cl2echo_page(slice);
284
285         LASSERT(cfs_mutex_is_locked(&ep->ep_lock));
286         cfs_mutex_unlock(&ep->ep_lock);
287 }
288
289 static void echo_page_discard(const struct lu_env *env,
290                               const struct cl_page_slice *slice,
291                               struct cl_io *unused)
292 {
293         cl_page_delete(env, slice->cpl_page);
294 }
295
296 static int echo_page_is_vmlocked(const struct lu_env *env,
297                                  const struct cl_page_slice *slice)
298 {
299         if (cfs_mutex_is_locked(&cl2echo_page(slice)->ep_lock))
300                 return -EBUSY;
301         return -ENODATA;
302 }
303
304 static void echo_page_completion(const struct lu_env *env,
305                                  const struct cl_page_slice *slice,
306                                  int ioret)
307 {
308         LASSERT(slice->cpl_page->cp_sync_io != NULL);
309 }
310
311 static void echo_page_fini(const struct lu_env *env,
312                            struct cl_page_slice *slice)
313 {
314         struct echo_page *ep    = cl2echo_page(slice);
315         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
316         cfs_page_t *vmpage      = ep->ep_vmpage;
317         ENTRY;
318
319         cfs_atomic_dec(&eco->eo_npages);
320         page_cache_release(vmpage);
321         OBD_SLAB_FREE_PTR(ep, echo_page_kmem);
322         EXIT;
323 }
324
325 static int echo_page_prep(const struct lu_env *env,
326                           const struct cl_page_slice *slice,
327                           struct cl_io *unused)
328 {
329         return 0;
330 }
331
332 static int echo_page_print(const struct lu_env *env,
333                            const struct cl_page_slice *slice,
334                            void *cookie, lu_printer_t printer)
335 {
336         struct echo_page *ep = cl2echo_page(slice);
337
338         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
339                    ep, cfs_mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
340         return 0;
341 }
342
343 static const struct cl_page_operations echo_page_ops = {
344         .cpo_own           = echo_page_own,
345         .cpo_disown        = echo_page_disown,
346         .cpo_discard       = echo_page_discard,
347         .cpo_vmpage        = echo_page_vmpage,
348         .cpo_fini          = echo_page_fini,
349         .cpo_print         = echo_page_print,
350         .cpo_is_vmlocked   = echo_page_is_vmlocked,
351         .io = {
352                 [CRT_READ] = {
353                         .cpo_prep        = echo_page_prep,
354                         .cpo_completion  = echo_page_completion,
355                 },
356                 [CRT_WRITE] = {
357                         .cpo_prep        = echo_page_prep,
358                         .cpo_completion  = echo_page_completion,
359                 }
360         }
361 };
362 /** @} echo_page */
363
364 /** \defgroup echo_lock Locking
365  *
366  * echo lock operations
367  *
368  * @{
369  */
370 static void echo_lock_fini(const struct lu_env *env,
371                            struct cl_lock_slice *slice)
372 {
373         struct echo_lock *ecl = cl2echo_lock(slice);
374
375         LASSERT(cfs_list_empty(&ecl->el_chain));
376         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
377 }
378
379 static void echo_lock_delete(const struct lu_env *env,
380                              const struct cl_lock_slice *slice)
381 {
382         struct echo_lock *ecl      = cl2echo_lock(slice);
383
384         LASSERT(cfs_list_empty(&ecl->el_chain));
385 }
386
387 static int echo_lock_fits_into(const struct lu_env *env,
388                                const struct cl_lock_slice *slice,
389                                const struct cl_lock_descr *need,
390                                const struct cl_io *unused)
391 {
392         return 1;
393 }
394
395 static struct cl_lock_operations echo_lock_ops = {
396         .clo_fini      = echo_lock_fini,
397         .clo_delete    = echo_lock_delete,
398         .clo_fits_into = echo_lock_fits_into
399 };
400
401 /** @} echo_lock */
402
403 /** \defgroup echo_cl_ops cl_object operations
404  *
405  * operations for cl_object
406  *
407  * @{
408  */
409 static struct cl_page *echo_page_init(const struct lu_env *env,
410                                       struct cl_object *obj,
411                                       struct cl_page *page, cfs_page_t *vmpage)
412 {
413         struct echo_page *ep;
414         ENTRY;
415
416         OBD_SLAB_ALLOC_PTR_GFP(ep, echo_page_kmem, CFS_ALLOC_IO);
417         if (ep != NULL) {
418                 struct echo_object *eco = cl2echo_obj(obj);
419                 ep->ep_vmpage = vmpage;
420                 page_cache_get(vmpage);
421                 cfs_mutex_init(&ep->ep_lock);
422                 cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
423                 cfs_atomic_inc(&eco->eo_npages);
424         }
425         RETURN(ERR_PTR(ep ? 0 : -ENOMEM));
426 }
427
428 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
429                         struct cl_io *io)
430 {
431         return 0;
432 }
433
434 static int echo_lock_init(const struct lu_env *env,
435                           struct cl_object *obj, struct cl_lock *lock,
436                           const struct cl_io *unused)
437 {
438         struct echo_lock *el;
439         ENTRY;
440
441         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, CFS_ALLOC_IO);
442         if (el != NULL) {
443                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
444                 el->el_object = cl2echo_obj(obj);
445                 CFS_INIT_LIST_HEAD(&el->el_chain);
446                 cfs_atomic_set(&el->el_refcount, 0);
447         }
448         RETURN(el == NULL ? -ENOMEM : 0);
449 }
450
451 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
452                          const struct cl_object_conf *conf)
453 {
454         return 0;
455 }
456
457 static const struct cl_object_operations echo_cl_obj_ops = {
458         .coo_page_init = echo_page_init,
459         .coo_lock_init = echo_lock_init,
460         .coo_io_init   = echo_io_init,
461         .coo_conf_set  = echo_conf_set
462 };
463 /** @} echo_cl_ops */
464
465 /** \defgroup echo_lu_ops lu_object operations
466  *
467  * operations for echo lu object.
468  *
469  * @{
470  */
471 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
472                             const struct lu_object_conf *conf)
473 {
474         const struct cl_object_conf *cconf = lu2cl_conf(conf);
475         struct echo_object_conf *econf = cl2echo_conf(cconf);
476         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
477         struct echo_client_obd *ec     = ed->ed_ec;
478         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
479         ENTRY;
480
481         if (ed->ed_next) {
482                 struct lu_object  *below;
483                 struct lu_device  *under;
484
485                 under = ed->ed_next;
486                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
487                                                         under);
488                 if (below == NULL)
489                         RETURN(-ENOMEM);
490                 lu_object_add(obj, below);
491         }
492
493         LASSERT(econf->eoc_md);
494         eco->eo_lsm = *econf->eoc_md;
495         eco->eo_dev = ed;
496         cfs_atomic_set(&eco->eo_npages, 0);
497
498         /* clear the lsm pointer so that it won't get freed. */
499         *econf->eoc_md = NULL;
500
501         cfs_spin_lock(&ec->ec_lock);
502         cfs_list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
503         cfs_spin_unlock(&ec->ec_lock);
504
505         RETURN(0);
506 }
507
508 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
509 {
510         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
511         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
512         struct lov_stripe_md *lsm  = eco->eo_lsm;
513         ENTRY;
514
515         LASSERT(cfs_atomic_read(&eco->eo_npages) == 0);
516
517         cfs_spin_lock(&ec->ec_lock);
518         cfs_list_del_init(&eco->eo_obj_chain);
519         cfs_spin_unlock(&ec->ec_lock);
520
521         lu_object_fini(obj);
522         lu_object_header_fini(obj->lo_header);
523
524         if (lsm)
525                 obd_free_memmd(ec->ec_exp, &lsm);
526         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
527         EXIT;
528 }
529
530 static int echo_object_print(const struct lu_env *env, void *cookie,
531                             lu_printer_t p, const struct lu_object *o)
532 {
533         struct echo_object *obj = cl2echo_obj(lu2cl(o));
534
535         return (*p)(env, cookie, "echoclient-object@%p", obj);
536 }
537
538
539 static const struct lu_object_operations echo_lu_obj_ops = {
540         .loo_object_init      = echo_object_init,
541         .loo_object_delete    = NULL,
542         .loo_object_release   = NULL,
543         .loo_object_free      = echo_object_free,
544         .loo_object_print     = echo_object_print,
545         .loo_object_invariant = NULL
546 };
547 /** @} echo_lu_ops */
548
549 /** \defgroup echo_lu_dev_ops  lu_device operations
550  *
551  * Operations for echo lu device.
552  *
553  * @{
554  */
555 static struct lu_object *echo_object_alloc(const struct lu_env *env,
556                                            const struct lu_object_header *hdr,
557                                            struct lu_device *dev)
558 {
559         struct echo_object *eco;
560         struct lu_object *obj = NULL;
561         ENTRY;
562
563         /* we're the top dev. */
564         LASSERT(hdr == NULL);
565         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, CFS_ALLOC_IO);
566         if (eco != NULL) {
567                 struct cl_object_header *hdr = &eco->eo_hdr;
568
569                 obj = &echo_obj2cl(eco)->co_lu;
570                 cl_object_header_init(hdr);
571                 lu_object_init(obj, &hdr->coh_lu, dev);
572                 lu_object_add_top(&hdr->coh_lu, obj);
573
574                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
575                 obj->lo_ops       = &echo_lu_obj_ops;
576         }
577         RETURN(obj);
578 }
579
580 static struct lu_device_operations echo_device_lu_ops = {
581         .ldo_object_alloc   = echo_object_alloc,
582 };
583 /** @} echo_lu_dev_ops */
584
585 static struct cl_device_operations echo_device_cl_ops = {
586 };
587
588 /** \defgroup echo_init Setup and teardown
589  *
590  * Init and fini functions for echo client.
591  *
592  * @{
593  */
594 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
595 {
596         struct cl_site *site = &ed->ed_site_myself;
597         int rc;
598
599         /* initialize site */
600         rc = cl_site_init(site, &ed->ed_cl);
601         if (rc) {
602                 CERROR("Cannot initilize site for echo client(%d)\n", rc);
603                 return rc;
604         }
605
606         rc = lu_site_init_finish(&site->cs_lu);
607         if (rc)
608                 return rc;
609
610         ed->ed_site = site;
611         return 0;
612 }
613
614 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
615 {
616         if (ed->ed_site) {
617                 cl_site_fini(ed->ed_site);
618                 ed->ed_site = NULL;
619         }
620 }
621
622 static void *echo_thread_key_init(const struct lu_context *ctx,
623                           struct lu_context_key *key)
624 {
625         struct echo_thread_info *info;
626
627         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, CFS_ALLOC_IO);
628         if (info == NULL)
629                 info = ERR_PTR(-ENOMEM);
630         return info;
631 }
632
633 static void echo_thread_key_fini(const struct lu_context *ctx,
634                          struct lu_context_key *key, void *data)
635 {
636         struct echo_thread_info *info = data;
637         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
638 }
639
640 static void echo_thread_key_exit(const struct lu_context *ctx,
641                          struct lu_context_key *key, void *data)
642 {
643 }
644
645 static struct lu_context_key echo_thread_key = {
646         .lct_tags = LCT_CL_THREAD,
647         .lct_init = echo_thread_key_init,
648         .lct_fini = echo_thread_key_fini,
649         .lct_exit = echo_thread_key_exit
650 };
651
652 static void *echo_session_key_init(const struct lu_context *ctx,
653                                   struct lu_context_key *key)
654 {
655         struct echo_session_info *session;
656
657         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, CFS_ALLOC_IO);
658         if (session == NULL)
659                 session = ERR_PTR(-ENOMEM);
660         return session;
661 }
662
663 static void echo_session_key_fini(const struct lu_context *ctx,
664                                  struct lu_context_key *key, void *data)
665 {
666         struct echo_session_info *session = data;
667         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
668 }
669
670 static void echo_session_key_exit(const struct lu_context *ctx,
671                                  struct lu_context_key *key, void *data)
672 {
673 }
674
675 static struct lu_context_key echo_session_key = {
676         .lct_tags = LCT_SESSION,
677         .lct_init = echo_session_key_init,
678         .lct_fini = echo_session_key_fini,
679         .lct_exit = echo_session_key_exit
680 };
681
682 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
683
684 static struct lu_device *echo_device_alloc(const struct lu_env *env,
685                                            struct lu_device_type *t,
686                                            struct lustre_cfg *cfg)
687 {
688         struct lu_device   *next;
689         struct echo_device *ed;
690         struct cl_device   *cd;
691         struct obd_device  *obd = NULL; /* to keep compiler happy */
692         struct obd_device  *tgt;
693         const char *tgt_type_name;
694         int rc;
695         int cleanup = 0;
696         ENTRY;
697
698         OBD_ALLOC_PTR(ed);
699         if (ed == NULL)
700                 GOTO(out, rc = -ENOMEM);
701
702         cleanup = 1;
703         cd = &ed->ed_cl;
704         rc = cl_device_init(cd, t);
705         if (rc)
706                 GOTO(out, rc);
707
708         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
709         cd->cd_ops = &echo_device_cl_ops;
710
711         cleanup = 2;
712         rc = echo_site_init(env, ed);
713         if (rc)
714                 GOTO(out, rc);
715
716         cleanup = 3;
717         obd = class_name2obd(lustre_cfg_string(cfg, 0));
718         LASSERT(obd != NULL);
719         rc = echo_client_setup(obd, cfg);
720         if (rc)
721                 GOTO(out, rc);
722         ed->ed_ec = &obd->u.echo_client;
723
724         cleanup = 4;
725         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
726         LASSERT(tgt != NULL);
727         next = tgt->obd_lu_dev;
728         if (next != NULL && !lu_device_is_cl(next))
729                 next = NULL;
730
731         /*
732          * if echo client is to be stacked upon ost device, the next is NULL
733          * since ost is not a clio device so far
734          */
735         tgt_type_name = tgt->obd_type->typ_name;
736         if (next != NULL) {
737                 LASSERT(next != NULL);
738                 if (next->ld_site != NULL)
739                         GOTO(out, rc = -EBUSY);
740
741                 next->ld_site = &ed->ed_site->cs_lu;
742                 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
743                                              next->ld_type->ldt_name, NULL);
744                 if (rc)
745                         GOTO(out, rc);
746
747                 /* Trikcy case, I have to determine the obd type since clio
748                  * uses the different parameters to initialize objects for
749                  * lov & osc.
750                  */
751                 if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
752                         ed->ed_next_islov = 1;
753                 else
754                         LASSERT(strcmp(tgt_type_name, LUSTRE_OSC_NAME) == 0);
755         } else
756                 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
757
758         ed->ed_next = next;
759         RETURN(&cd->cd_lu_dev);
760
761 out:
762         switch(cleanup) {
763         case 4: {
764                 int rc2;
765                 rc2 = echo_client_cleanup(obd);
766                 if (rc2)
767                         CERROR("Cleanup obd device %s error(%d)\n",
768                                obd->obd_name, rc2);
769         }
770
771         case 3:
772                 echo_site_fini(env, ed);
773         case 2:
774                 cl_device_fini(&ed->ed_cl);
775         case 1:
776                 OBD_FREE_PTR(ed);
777         case 0:
778         default:
779                 break;
780         }
781         return(ERR_PTR(rc));
782 }
783
784 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
785                           const char *name, struct lu_device *next)
786 {
787         LBUG();
788         return 0;
789 }
790
791 static struct lu_device *echo_device_fini(const struct lu_env *env,
792                                           struct lu_device *d)
793 {
794         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
795         struct lu_device *next = ed->ed_next;
796
797         while (next)
798                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
799         return NULL;
800 }
801
802 static void echo_lock_release(const struct lu_env *env,
803                               struct echo_lock *ecl,
804                               int still_used)
805 {
806         struct cl_lock *clk = echo_lock2cl(ecl);
807
808         cl_lock_get(clk);
809         cl_unuse(env, clk);
810         cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
811         if (!still_used) {
812                 cl_lock_mutex_get(env, clk);
813                 cl_lock_cancel(env, clk);
814                 cl_lock_delete(env, clk);
815                 cl_lock_mutex_put(env, clk);
816         }
817         cl_lock_put(env, clk);
818 }
819
820 static struct lu_device *echo_device_free(const struct lu_env *env,
821                                           struct lu_device *d)
822 {
823         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
824         struct echo_client_obd *ec   = ed->ed_ec;
825         struct echo_object     *eco;
826         struct lu_device       *next = ed->ed_next;
827
828         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
829                ed, next);
830
831         LASSERT(ed->ed_site);
832         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
833
834         /* check if there are objects still alive.
835          * It shouldn't have any object because lu_site_purge would cleanup
836          * all of cached objects. Anyway, probably the echo device is being
837          * parallelly accessed.
838          */
839         cfs_spin_lock(&ec->ec_lock);
840         cfs_list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
841                 eco->eo_deleted = 1;
842         cfs_spin_unlock(&ec->ec_lock);
843
844         /* purge again */
845         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
846
847         CDEBUG(D_INFO,
848                "Waiting for the reference of echo object to be dropped\n");
849
850         /* Wait for the last reference to be dropped. */
851         cfs_spin_lock(&ec->ec_lock);
852         while (!cfs_list_empty(&ec->ec_objects)) {
853                 cfs_spin_unlock(&ec->ec_lock);
854                 CERROR("echo_client still has objects at cleanup time, "
855                        "wait for 1 second\n");
856                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
857                                                    cfs_time_seconds(1));
858                 cfs_spin_lock(&ec->ec_lock);
859         }
860         cfs_spin_unlock(&ec->ec_lock);
861
862         LASSERT(cfs_list_empty(&ec->ec_locks));
863
864         CDEBUG(D_INFO, "No object exists, exiting...\n");
865
866         echo_client_cleanup(d->ld_obd);
867
868         while (next)
869                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
870
871         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
872         echo_site_fini(env, ed);
873         cl_device_fini(&ed->ed_cl);
874         OBD_FREE_PTR(ed);
875
876         return NULL;
877 }
878
879 static const struct lu_device_type_operations echo_device_type_ops = {
880         .ldto_init = echo_type_init,
881         .ldto_fini = echo_type_fini,
882
883         .ldto_start = echo_type_start,
884         .ldto_stop  = echo_type_stop,
885
886         .ldto_device_alloc = echo_device_alloc,
887         .ldto_device_free  = echo_device_free,
888         .ldto_device_init  = echo_device_init,
889         .ldto_device_fini  = echo_device_fini
890 };
891
892 static struct lu_device_type echo_device_type = {
893         .ldt_tags     = LU_DEVICE_CL,
894         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
895         .ldt_ops      = &echo_device_type_ops,
896         .ldt_ctx_tags = LCT_CL_THREAD
897 };
898 /** @} echo_init */
899
900 /** \defgroup echo_exports Exported operations
901  *
902  * exporting functions to echo client
903  *
904  * @{
905  */
906
907 /* Interfaces to echo client obd device */
908 static struct echo_object *cl_echo_object_find(struct echo_device *d,
909                                                struct lov_stripe_md **lsmp)
910 {
911         struct lu_env *env;
912         struct echo_thread_info *info;
913         struct echo_object_conf *conf;
914         struct lov_stripe_md    *lsm;
915         struct echo_object *eco;
916         struct cl_object   *obj;
917         struct lu_fid *fid;
918         int refcheck;
919         ENTRY;
920
921         LASSERT(lsmp);
922         lsm = *lsmp;
923         LASSERT(lsm);
924         LASSERT(lsm->lsm_object_id);
925
926         /* Never return an object if the obd is to be freed. */
927         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
928                 RETURN(ERR_PTR(-ENODEV));
929
930         env = cl_env_get(&refcheck);
931         if (IS_ERR(env))
932                 RETURN((void *)env);
933
934         info = echo_env_info(env);
935         conf = &info->eti_conf;
936         if (d->ed_next) {
937                 if (!d->ed_next_islov) {
938                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
939                         LASSERT(oinfo != NULL);
940                         oinfo->loi_id = lsm->lsm_object_id;
941                         oinfo->loi_seq = lsm->lsm_object_seq;
942                         conf->eoc_cl.u.coc_oinfo = oinfo;
943                 } else {
944                         struct lustre_md *md;
945                         md = &info->eti_md;
946                         memset(md, 0, sizeof *md);
947                         md->lsm = lsm;
948                         conf->eoc_cl.u.coc_md = md;
949                 }
950         }
951         conf->eoc_md = lsmp;
952
953         fid  = &info->eti_fid;
954         lsm2fid(lsm, fid);
955
956         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
957         if (IS_ERR(obj))
958                 GOTO(out, eco = (void*)obj);
959
960         eco = cl2echo_obj(obj);
961         if (eco->eo_deleted) {
962                 cl_object_put(env, obj);
963                 eco = ERR_PTR(-EAGAIN);
964         }
965
966 out:
967         cl_env_put(env, &refcheck);
968         RETURN(eco);
969 }
970
971 static int cl_echo_object_put(struct echo_object *eco)
972 {
973         struct lu_env *env;
974         struct cl_object *obj = echo_obj2cl(eco);
975         int refcheck;
976         ENTRY;
977
978         env = cl_env_get(&refcheck);
979         if (IS_ERR(env))
980                 RETURN(PTR_ERR(env));
981
982         /* an external function to kill an object? */
983         if (eco->eo_deleted) {
984                 struct lu_object_header *loh = obj->co_lu.lo_header;
985                 LASSERT(&eco->eo_hdr == luh2coh(loh));
986                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
987         }
988
989         cl_object_put(env, obj);
990         cl_env_put(env, &refcheck);
991         RETURN(0);
992 }
993
994 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
995                             obd_off start, obd_off end, int mode,
996                             __u64 *cookie , __u32 enqflags)
997 {
998         struct cl_io *io;
999         struct cl_lock *lck;
1000         struct cl_object *obj;
1001         struct cl_lock_descr *descr;
1002         struct echo_thread_info *info;
1003         int rc = -ENOMEM;
1004         ENTRY;
1005
1006         info = echo_env_info(env);
1007         io = &info->eti_io;
1008         descr = &info->eti_descr;
1009         obj = echo_obj2cl(eco);
1010
1011         descr->cld_obj   = obj;
1012         descr->cld_start = cl_index(obj, start);
1013         descr->cld_end   = cl_index(obj, end);
1014         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1015         descr->cld_enq_flags = enqflags;
1016         io->ci_obj = obj;
1017
1018         lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1019         if (lck) {
1020                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1021                 struct echo_lock *el;
1022
1023                 rc = cl_wait(env, lck);
1024                 if (rc == 0) {
1025                         el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1026                         cfs_spin_lock(&ec->ec_lock);
1027                         if (cfs_list_empty(&el->el_chain)) {
1028                                 cfs_list_add(&el->el_chain, &ec->ec_locks);
1029                                 el->el_cookie = ++ec->ec_unique;
1030                         }
1031                         cfs_atomic_inc(&el->el_refcount);
1032                         *cookie = el->el_cookie;
1033                         cfs_spin_unlock(&ec->ec_lock);
1034                 } else
1035                         cl_lock_release(env, lck, "ec enqueue", cfs_current());
1036         }
1037         RETURN(rc);
1038 }
1039
1040 static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end,
1041                            int mode, __u64 *cookie)
1042 {
1043         struct echo_thread_info *info;
1044         struct lu_env *env;
1045         struct cl_io *io;
1046         int refcheck;
1047         int result;
1048         ENTRY;
1049
1050         env = cl_env_get(&refcheck);
1051         if (IS_ERR(env))
1052                 RETURN(PTR_ERR(env));
1053
1054         info = echo_env_info(env);
1055         io = &info->eti_io;
1056
1057         result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1058         if (result < 0)
1059                 GOTO(out, result);
1060         LASSERT(result == 0);
1061
1062         result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1063         cl_io_fini(env, io);
1064
1065         EXIT;
1066 out:
1067         cl_env_put(env, &refcheck);
1068         return result;
1069 }
1070
1071 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1072                            __u64 cookie)
1073 {
1074         struct echo_client_obd *ec = ed->ed_ec;
1075         struct echo_lock       *ecl = NULL;
1076         cfs_list_t             *el;
1077         int found = 0, still_used = 0;
1078         ENTRY;
1079
1080         LASSERT(ec != NULL);
1081         cfs_spin_lock (&ec->ec_lock);
1082         cfs_list_for_each (el, &ec->ec_locks) {
1083                 ecl = cfs_list_entry (el, struct echo_lock, el_chain);
1084                 CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
1085                 found = (ecl->el_cookie == cookie);
1086                 if (found) {
1087                         if (cfs_atomic_dec_and_test(&ecl->el_refcount))
1088                                 cfs_list_del_init(&ecl->el_chain);
1089                         else
1090                                 still_used = 1;
1091                         break;
1092                 }
1093         }
1094         cfs_spin_unlock (&ec->ec_lock);
1095
1096         if (!found)
1097                 RETURN(-ENOENT);
1098
1099         echo_lock_release(env, ecl, still_used);
1100         RETURN(0);
1101 }
1102
1103 static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1104 {
1105         struct lu_env *env;
1106         int refcheck;
1107         int rc;
1108         ENTRY;
1109
1110         env = cl_env_get(&refcheck);
1111         if (IS_ERR(env))
1112                 RETURN(PTR_ERR(env));
1113
1114         rc = cl_echo_cancel0(env, ed, cookie);
1115
1116         cl_env_put(env, &refcheck);
1117         RETURN(rc);
1118 }
1119
1120 static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1121                              enum cl_req_type unused, struct cl_2queue *queue)
1122 {
1123         struct cl_page *clp;
1124         struct cl_page *temp;
1125         int result = 0;
1126         ENTRY;
1127
1128         cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1129                 int rc;
1130                 rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1131                 if (rc == 0)
1132                         continue;
1133                 result = result ?: rc;
1134         }
1135         RETURN(result);
1136 }
1137
1138 static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
1139                               cfs_page_t **pages, int npages, int async)
1140 {
1141         struct lu_env           *env;
1142         struct echo_thread_info *info;
1143         struct cl_object        *obj = echo_obj2cl(eco);
1144         struct echo_device      *ed  = eco->eo_dev;
1145         struct cl_2queue        *queue;
1146         struct cl_io            *io;
1147         struct cl_page          *clp;
1148         struct lustre_handle    lh = { 0 };
1149         int page_size = cl_page_size(obj);
1150         int refcheck;
1151         int rc;
1152         int i;
1153         ENTRY;
1154
1155         LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1156         LASSERT(ed->ed_next != NULL);
1157         env = cl_env_get(&refcheck);
1158         if (IS_ERR(env))
1159                 RETURN(PTR_ERR(env));
1160
1161         info    = echo_env_info(env);
1162         io      = &info->eti_io;
1163         queue   = &info->eti_queue;
1164
1165         cl_2queue_init(queue);
1166         rc = cl_io_init(env, io, CIT_MISC, obj);
1167         if (rc < 0)
1168                 GOTO(out, rc);
1169         LASSERT(rc == 0);
1170
1171
1172         rc = cl_echo_enqueue0(env, eco, offset,
1173                               offset + npages * CFS_PAGE_SIZE - 1,
1174                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1175                               CEF_NEVER);
1176         if (rc < 0)
1177                 GOTO(error_lock, rc);
1178
1179         for (i = 0; i < npages; i++) {
1180                 LASSERT(pages[i]);
1181                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1182                                    pages[i], CPT_TRANSIENT);
1183                 if (IS_ERR(clp)) {
1184                         rc = PTR_ERR(clp);
1185                         break;
1186                 }
1187                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1188
1189                 rc = cl_page_own(env, io, clp);
1190                 if (rc) {
1191                         LASSERT(clp->cp_state == CPS_FREEING);
1192                         cl_page_put(env, clp);
1193                         break;
1194                 }
1195
1196                 cl_2queue_add(queue, clp);
1197
1198                 /* drop the reference count for cl_page_find, so that the page
1199                  * will be freed in cl_2queue_fini. */
1200                 cl_page_put(env, clp);
1201                 cl_page_clip(env, clp, 0, page_size);
1202
1203                 offset += page_size;
1204         }
1205
1206         if (rc == 0) {
1207                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1208
1209                 async = async && (typ == CRT_WRITE);
1210                 if (async)
1211                         rc = cl_echo_async_brw(env, io, typ, queue);
1212                 else
1213                         rc = cl_io_submit_sync(env, io, typ, queue,
1214                                                CRP_NORMAL, 0);
1215                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1216                        async ? "async" : "sync", rc);
1217         }
1218
1219         cl_echo_cancel0(env, ed, lh.cookie);
1220         EXIT;
1221 error_lock:
1222         cl_2queue_discard(env, io, queue);
1223         cl_2queue_disown(env, io, queue);
1224         cl_2queue_fini(env, queue);
1225         cl_io_fini(env, io);
1226 out:
1227         cl_env_put(env, &refcheck);
1228         return rc;
1229 }
1230 /** @} echo_exports */
1231
1232
1233 static obd_id last_object_id;
1234
1235 static int
1236 echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1237 {
1238         struct lov_stripe_md *ulsm = _ulsm;
1239         int nob, i;
1240
1241         nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1242         if (nob > ulsm_nob)
1243                 return (-EINVAL);
1244
1245         if (cfs_copy_to_user (ulsm, lsm, sizeof(ulsm)))
1246                 return (-EFAULT);
1247
1248         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1249                 if (cfs_copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
1250                                       sizeof(lsm->lsm_oinfo[0])))
1251                         return (-EFAULT);
1252         }
1253         return 0;
1254 }
1255
1256 static int
1257 echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
1258                  void *ulsm, int ulsm_nob)
1259 {
1260         struct echo_client_obd *ec = ed->ed_ec;
1261         int                     i;
1262
1263         if (ulsm_nob < sizeof (*lsm))
1264                 return (-EINVAL);
1265
1266         if (cfs_copy_from_user (lsm, ulsm, sizeof (*lsm)))
1267                 return (-EFAULT);
1268
1269         if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1270             lsm->lsm_magic != LOV_MAGIC ||
1271             (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1272             ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1273                 return (-EINVAL);
1274
1275
1276         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1277                 if (cfs_copy_from_user(lsm->lsm_oinfo[i],
1278                                        ((struct lov_stripe_md *)ulsm)-> \
1279                                        lsm_oinfo[i],
1280                                        sizeof(lsm->lsm_oinfo[0])))
1281                         return (-EFAULT);
1282         }
1283         return (0);
1284 }
1285
1286 static int echo_create_object(struct echo_device *ed, int on_target,
1287                               struct obdo *oa, void *ulsm, int ulsm_nob,
1288                               struct obd_trans_info *oti)
1289 {
1290         struct echo_object     *eco;
1291         struct echo_client_obd *ec = ed->ed_ec;
1292         struct lov_stripe_md   *lsm = NULL;
1293         int                     rc;
1294         int                     created = 0;
1295         ENTRY;
1296
1297         if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
1298             (on_target ||                       /* set_stripe */
1299              ec->ec_nstripes != 0)) {           /* LOV */
1300                 CERROR ("No valid oid\n");
1301                 RETURN(-EINVAL);
1302         }
1303
1304         rc = obd_alloc_memmd(ec->ec_exp, &lsm);
1305         if (rc < 0) {
1306                 CERROR("Cannot allocate md, rc = %d\n", rc);
1307                 GOTO(failed, rc);
1308         }
1309
1310         if (ulsm != NULL) {
1311                 int i, idx;
1312
1313                 rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob);
1314                 if (rc != 0)
1315                         GOTO(failed, rc);
1316
1317                 if (lsm->lsm_stripe_count == 0)
1318                         lsm->lsm_stripe_count = ec->ec_nstripes;
1319
1320                 if (lsm->lsm_stripe_size == 0)
1321                         lsm->lsm_stripe_size = CFS_PAGE_SIZE;
1322
1323                 idx = cfs_rand();
1324
1325                 /* setup stripes: indices + default ids if required */
1326                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1327                         if (lsm->lsm_oinfo[i]->loi_id == 0)
1328                                 lsm->lsm_oinfo[i]->loi_id = lsm->lsm_object_id;
1329
1330                         lsm->lsm_oinfo[i]->loi_ost_idx =
1331                                 (idx + i) % ec->ec_nstripes;
1332                 }
1333         }
1334
1335         /* setup object ID here for !on_target and LOV hint */
1336         if (oa->o_valid & OBD_MD_FLID)
1337                 lsm->lsm_object_id = oa->o_id;
1338
1339         if (lsm->lsm_object_id == 0)
1340                 lsm->lsm_object_id = ++last_object_id;
1341
1342         rc = 0;
1343         if (on_target) {
1344                 /* Only echo objects are allowed to be created */
1345                 LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
1346                         (oa->o_seq == FID_SEQ_ECHO));
1347                 rc = obd_create(ec->ec_exp, oa, &lsm, oti);
1348                 if (rc != 0) {
1349                         CERROR("Cannot create objects, rc = %d\n", rc);
1350                         GOTO(failed, rc);
1351                 }
1352                 created = 1;
1353         }
1354
1355         /* See what object ID we were given */
1356         oa->o_id = lsm->lsm_object_id;
1357         oa->o_valid |= OBD_MD_FLID;
1358
1359         eco = cl_echo_object_find(ed, &lsm);
1360         if (IS_ERR(eco))
1361                 GOTO(failed, rc = PTR_ERR(eco));
1362         cl_echo_object_put(eco);
1363
1364         CDEBUG(D_INFO, "oa->o_id = %lx\n", (long)oa->o_id);
1365         EXIT;
1366
1367  failed:
1368         if (created && rc)
1369                 obd_destroy(ec->ec_exp, oa, lsm, oti, NULL, NULL);
1370         if (lsm)
1371                 obd_free_memmd(ec->ec_exp, &lsm);
1372         if (rc)
1373                 CERROR("create object failed with rc = %d\n", rc);
1374         return (rc);
1375 }
1376
1377 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1378                            struct obdo *oa)
1379 {
1380         struct echo_client_obd *ec  = ed->ed_ec;
1381         struct lov_stripe_md   *lsm = NULL;
1382         struct echo_object     *eco;
1383         int                     rc;
1384         ENTRY;
1385
1386         if ((oa->o_valid & OBD_MD_FLID) == 0 ||
1387             oa->o_id == 0)  /* disallow use of object id 0 */
1388         {
1389                 CERROR ("No valid oid\n");
1390                 RETURN(-EINVAL);
1391         }
1392
1393         rc = obd_alloc_memmd(ec->ec_exp, &lsm);
1394         if (rc < 0)
1395                 RETURN(rc);
1396
1397         lsm->lsm_object_id = oa->o_id;
1398         if (oa->o_valid & OBD_MD_FLGROUP)
1399                 lsm->lsm_object_seq = oa->o_seq;
1400         else
1401                 lsm->lsm_object_seq = FID_SEQ_ECHO;
1402
1403         rc = 0;
1404         eco = cl_echo_object_find(ed, &lsm);
1405         if (!IS_ERR(eco))
1406                 *ecop = eco;
1407         else
1408                 rc = PTR_ERR(eco);
1409         if (lsm)
1410                 obd_free_memmd(ec->ec_exp, &lsm);
1411         RETURN(rc);
1412 }
1413
1414 static void echo_put_object(struct echo_object *eco)
1415 {
1416         if (cl_echo_object_put(eco))
1417                 CERROR("echo client: drop an object failed");
1418 }
1419
1420 static void
1421 echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
1422 {
1423         unsigned long stripe_count;
1424         unsigned long stripe_size;
1425         unsigned long width;
1426         unsigned long woffset;
1427         int           stripe_index;
1428         obd_off       offset;
1429
1430         if (lsm->lsm_stripe_count <= 1)
1431                 return;
1432
1433         offset       = *offp;
1434         stripe_size  = lsm->lsm_stripe_size;
1435         stripe_count = lsm->lsm_stripe_count;
1436
1437         /* width = # bytes in all stripes */
1438         width = stripe_size * stripe_count;
1439
1440         /* woffset = offset within a width; offset = whole number of widths */
1441         woffset = do_div (offset, width);
1442
1443         stripe_index = woffset / stripe_size;
1444
1445         *idp = lsm->lsm_oinfo[stripe_index]->loi_id;
1446         *offp = offset * stripe_size + woffset % stripe_size;
1447 }
1448
1449 static void
1450 echo_client_page_debug_setup(struct lov_stripe_md *lsm,
1451                              cfs_page_t *page, int rw, obd_id id,
1452                              obd_off offset, obd_off count)
1453 {
1454         char    *addr;
1455         obd_off  stripe_off;
1456         obd_id   stripe_id;
1457         int      delta;
1458
1459         /* no partial pages on the client */
1460         LASSERT(count == CFS_PAGE_SIZE);
1461
1462         addr = cfs_kmap(page);
1463
1464         for (delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1465                 if (rw == OBD_BRW_WRITE) {
1466                         stripe_off = offset + delta;
1467                         stripe_id = id;
1468                         echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1469                 } else {
1470                         stripe_off = 0xdeadbeef00c0ffeeULL;
1471                         stripe_id = 0xdeadbeef00c0ffeeULL;
1472                 }
1473                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1474                                   stripe_off, stripe_id);
1475         }
1476
1477         cfs_kunmap(page);
1478 }
1479
1480 static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
1481                                         cfs_page_t *page, obd_id id,
1482                                         obd_off offset, obd_off count)
1483 {
1484         obd_off stripe_off;
1485         obd_id  stripe_id;
1486         char   *addr;
1487         int     delta;
1488         int     rc;
1489         int     rc2;
1490
1491         /* no partial pages on the client */
1492         LASSERT(count == CFS_PAGE_SIZE);
1493
1494         addr = cfs_kmap(page);
1495
1496         for (rc = delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1497                 stripe_off = offset + delta;
1498                 stripe_id = id;
1499                 echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
1500
1501                 rc2 = block_debug_check("test_brw",
1502                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
1503                                         stripe_off, stripe_id);
1504                 if (rc2 != 0) {
1505                         CERROR ("Error in echo object "LPX64"\n", id);
1506                         rc = rc2;
1507                 }
1508         }
1509
1510         cfs_kunmap(page);
1511         return rc;
1512 }
1513
1514 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1515                             struct echo_object *eco, obd_off offset,
1516                             obd_size count, int async,
1517                             struct obd_trans_info *oti)
1518 {
1519         struct echo_client_obd *ec  = ed->ed_ec;
1520         struct lov_stripe_md   *lsm = eco->eo_lsm;
1521         obd_count               npages;
1522         struct brw_page        *pga;
1523         struct brw_page        *pgp;
1524         cfs_page_t            **pages;
1525         obd_off                 off;
1526         int                     i;
1527         int                     rc;
1528         int                     verify;
1529         int                     gfp_mask;
1530         int                     brw_flags = 0;
1531         ENTRY;
1532
1533         verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
1534                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1535                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1536
1537         gfp_mask = ((oa->o_id & 2) == 0) ? CFS_ALLOC_STD : CFS_ALLOC_HIGHUSER;
1538
1539         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1540         LASSERT(lsm != NULL);
1541         LASSERT(lsm->lsm_object_id == oa->o_id);
1542
1543         if (count <= 0 ||
1544             (count & (~CFS_PAGE_MASK)) != 0)
1545                 RETURN(-EINVAL);
1546
1547         /* XXX think again with misaligned I/O */
1548         npages = count >> CFS_PAGE_SHIFT;
1549
1550         if (rw == OBD_BRW_WRITE)
1551                 brw_flags = OBD_BRW_ASYNC;
1552
1553         OBD_ALLOC(pga, npages * sizeof(*pga));
1554         if (pga == NULL)
1555                 RETURN(-ENOMEM);
1556
1557         OBD_ALLOC(pages, npages * sizeof(*pages));
1558         if (pages == NULL) {
1559                 OBD_FREE(pga, npages * sizeof(*pga));
1560                 RETURN(-ENOMEM);
1561         }
1562
1563         for (i = 0, pgp = pga, off = offset;
1564              i < npages;
1565              i++, pgp++, off += CFS_PAGE_SIZE) {
1566
1567                 LASSERT (pgp->pg == NULL);      /* for cleanup */
1568
1569                 rc = -ENOMEM;
1570                 OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
1571                 if (pgp->pg == NULL)
1572                         goto out;
1573
1574                 pages[i] = pgp->pg;
1575                 pgp->count = CFS_PAGE_SIZE;
1576                 pgp->off = off;
1577                 pgp->flag = brw_flags;
1578
1579                 if (verify)
1580                         echo_client_page_debug_setup(lsm, pgp->pg, rw,
1581                                                      oa->o_id, off, pgp->count);
1582         }
1583
1584         if (ed->ed_next == NULL) {
1585                 struct obd_info oinfo = { { { 0 } } };
1586                 oinfo.oi_oa = oa;
1587                 oinfo.oi_md = lsm;
1588                 rc = obd_brw(rw, ec->ec_exp, &oinfo, npages, pga, oti);
1589         } else
1590                 rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1591
1592  out:
1593         if (rc != 0 || rw != OBD_BRW_READ)
1594                 verify = 0;
1595
1596         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1597                 if (pgp->pg == NULL)
1598                         continue;
1599
1600                 if (verify) {
1601                         int vrc;
1602                         vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,
1603                                                            pgp->off, pgp->count);
1604                         if (vrc != 0 && rc == 0)
1605                                 rc = vrc;
1606                 }
1607                 OBD_PAGE_FREE(pgp->pg);
1608         }
1609         OBD_FREE(pga, npages * sizeof(*pga));
1610         OBD_FREE(pages, npages * sizeof(*pages));
1611         RETURN(rc);
1612 }
1613
1614 static int echo_client_prep_commit(struct obd_export *exp, int rw,
1615                                    struct obdo *oa, struct echo_object *eco,
1616                                    obd_off offset, obd_size count,
1617                                    obd_size batch, struct obd_trans_info *oti)
1618 {
1619         struct lov_stripe_md *lsm = eco->eo_lsm;
1620         struct obd_ioobj ioo;
1621         struct niobuf_local *lnb;
1622         struct niobuf_remote *rnb;
1623         obd_off off;
1624         obd_size npages, tot_pages;
1625         int i, ret = 0;
1626         ENTRY;
1627
1628         if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
1629             (lsm != NULL && lsm->lsm_object_id != oa->o_id))
1630                 RETURN(-EINVAL);
1631
1632         npages = batch >> CFS_PAGE_SHIFT;
1633         tot_pages = count >> CFS_PAGE_SHIFT;
1634
1635         OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
1636         OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
1637
1638         if (lnb == NULL || rnb == NULL)
1639                 GOTO(out, ret = -ENOMEM);
1640
1641         obdo_to_ioobj(oa, &ioo);
1642
1643         off = offset;
1644
1645         for(; tot_pages; tot_pages -= npages) {
1646                 int lpages;
1647
1648                 if (tot_pages < npages)
1649                         npages = tot_pages;
1650
1651                 for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) {
1652                         rnb[i].offset = off;
1653                         rnb[i].len = CFS_PAGE_SIZE;
1654                 }
1655
1656                 ioo.ioo_bufcnt = npages;
1657                 oti->oti_transno = 0;
1658
1659                 lpages = npages;
1660                 ret = obd_preprw(rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti,
1661                                  NULL);
1662                 if (ret != 0)
1663                         GOTO(out, ret);
1664                 LASSERT(lpages == npages);
1665
1666                 for (i = 0; i < lpages; i++) {
1667                         cfs_page_t *page = lnb[i].page;
1668
1669                         /* read past eof? */
1670                         if (page == NULL && lnb[i].rc == 0)
1671                                 continue;
1672
1673                         if (oa->o_id == ECHO_PERSISTENT_OBJID ||
1674                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1675                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1676                                 continue;
1677
1678                         if (rw == OBD_BRW_WRITE)
1679                                 echo_client_page_debug_setup(lsm, page, rw,
1680                                                              oa->o_id,
1681                                                              rnb[i].offset,
1682                                                              rnb[i].len);
1683                         else
1684                                 echo_client_page_debug_check(lsm, page,
1685                                                              oa->o_id,
1686                                                              rnb[i].offset,
1687                                                              rnb[i].len);
1688                 }
1689
1690                 ret = obd_commitrw(rw, exp, oa, 1,&ioo,rnb,npages,lnb,oti,ret);
1691                 if (ret != 0)
1692                         GOTO(out, ret);
1693
1694                 /* Reset oti otherwise it would confuse ldiskfs. */
1695                 memset(oti, 0, sizeof(*oti));
1696         }
1697
1698 out:
1699         if (lnb)
1700                 OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
1701         if (rnb)
1702                 OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
1703         RETURN(ret);
1704 }
1705
1706 static int echo_client_brw_ioctl(int rw, struct obd_export *exp,
1707                                  struct obd_ioctl_data *data)
1708 {
1709         struct obd_device *obd = class_exp2obd(exp);
1710         struct echo_device *ed = obd2echo_dev(obd);
1711         struct echo_client_obd *ec = ed->ed_ec;
1712         struct obd_trans_info dummy_oti = { 0 };
1713         struct obdo *oa = &data->ioc_obdo1;
1714         struct echo_object *eco;
1715         int rc;
1716         int async = 1;
1717         ENTRY;
1718
1719         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1720
1721         rc = echo_get_object(&eco, ed, oa);
1722         if (rc)
1723                 RETURN(rc);
1724
1725         oa->o_valid &= ~OBD_MD_FLHANDLE;
1726
1727         switch((long)data->ioc_pbuf1) {
1728         case 1:
1729                 async = 0;
1730                 /* fall through */
1731         case 2:
1732                 rc = echo_client_kbrw(ed, rw, oa,
1733                                       eco, data->ioc_offset,
1734                                       data->ioc_count, async, &dummy_oti);
1735                 break;
1736         case 3:
1737                 rc = echo_client_prep_commit(ec->ec_exp, rw, oa,
1738                                             eco, data->ioc_offset,
1739                                             data->ioc_count, data->ioc_plen1,
1740                                             &dummy_oti);
1741                 break;
1742         default:
1743                 rc = -EINVAL;
1744         }
1745         echo_put_object(eco);
1746         RETURN(rc);
1747 }
1748
1749 static int
1750 echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
1751                     int mode, obd_off offset, obd_size nob)
1752 {
1753         struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
1754         struct lustre_handle   *ulh = &oa->o_handle;
1755         struct echo_object     *eco;
1756         obd_off                 end;
1757         int                     rc;
1758         ENTRY;
1759
1760         if (ed->ed_next == NULL)
1761                 RETURN(-EOPNOTSUPP);
1762
1763         if (!(mode == LCK_PR || mode == LCK_PW))
1764                 RETURN(-EINVAL);
1765
1766         if ((offset & (~CFS_PAGE_MASK)) != 0 ||
1767             (nob & (~CFS_PAGE_MASK)) != 0)
1768                 RETURN(-EINVAL);
1769
1770         rc = echo_get_object (&eco, ed, oa);
1771         if (rc != 0)
1772                 RETURN(rc);
1773
1774         end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
1775         rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
1776         if (rc == 0) {
1777                 oa->o_valid |= OBD_MD_FLHANDLE;
1778                 CDEBUG(D_INFO, "Cookie is "LPX64"\n", ulh->cookie);
1779         }
1780         echo_put_object(eco);
1781         RETURN(rc);
1782 }
1783
1784 static int
1785 echo_client_cancel(struct obd_export *exp, struct obdo *oa)
1786 {
1787         struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
1788         __u64               cookie = oa->o_handle.cookie;
1789
1790         if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
1791                 return -EINVAL;
1792
1793         CDEBUG(D_INFO, "Cookie is "LPX64"\n", cookie);
1794         return cl_echo_cancel(ed, cookie);
1795 }
1796
1797 static int
1798 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
1799                       int len, void *karg, void *uarg)
1800 {
1801         struct obd_device      *obd = exp->exp_obd;
1802         struct echo_device     *ed = obd2echo_dev(obd);
1803         struct echo_client_obd *ec = ed->ed_ec;
1804         struct echo_object     *eco;
1805         struct obd_ioctl_data  *data = karg;
1806         struct obd_trans_info   dummy_oti;
1807         struct oti_req_ack_lock *ack_lock;
1808         struct obdo            *oa;
1809         struct lu_fid           fid;
1810         int                     rw = OBD_BRW_READ;
1811         int                     rc = 0;
1812         int                     i;
1813         ENTRY;
1814
1815 #ifndef HAVE_UNLOCKED_IOCTL
1816         cfs_unlock_kernel();
1817 #endif
1818
1819         memset(&dummy_oti, 0, sizeof(dummy_oti));
1820
1821         oa = &data->ioc_obdo1;
1822         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1823                 oa->o_valid |= OBD_MD_FLGROUP;
1824                 oa->o_seq = FID_SEQ_ECHO;
1825         }
1826
1827         /* This FID is unpacked just for validation at this point */
1828         rc = fid_ostid_unpack(&fid, &oa->o_oi, 0);
1829         if (rc < 0)
1830                 RETURN(rc);
1831
1832         switch (cmd) {
1833         case OBD_IOC_CREATE:                    /* may create echo object */
1834                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1835                         GOTO (out, rc = -EPERM);
1836
1837                 rc = echo_create_object (ed, 1, oa,
1838                                          data->ioc_pbuf1, data->ioc_plen1,
1839                                          &dummy_oti);
1840                 GOTO(out, rc);
1841
1842         case OBD_IOC_DESTROY:
1843                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1844                         GOTO (out, rc = -EPERM);
1845
1846                 rc = echo_get_object (&eco, ed, oa);
1847                 if (rc == 0) {
1848                         rc = obd_destroy(ec->ec_exp, oa, eco->eo_lsm,
1849                                          &dummy_oti, NULL, NULL);
1850                         if (rc == 0)
1851                                 eco->eo_deleted = 1;
1852                         echo_put_object(eco);
1853                 }
1854                 GOTO(out, rc);
1855
1856         case OBD_IOC_GETATTR:
1857                 rc = echo_get_object (&eco, ed, oa);
1858                 if (rc == 0) {
1859                         struct obd_info oinfo = { { { 0 } } };
1860                         oinfo.oi_md = eco->eo_lsm;
1861                         oinfo.oi_oa = oa;
1862                         rc = obd_getattr(ec->ec_exp, &oinfo);
1863                         echo_put_object(eco);
1864                 }
1865                 GOTO(out, rc);
1866
1867         case OBD_IOC_SETATTR:
1868                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1869                         GOTO (out, rc = -EPERM);
1870
1871                 rc = echo_get_object (&eco, ed, oa);
1872                 if (rc == 0) {
1873                         struct obd_info oinfo = { { { 0 } } };
1874                         oinfo.oi_oa = oa;
1875                         oinfo.oi_md = eco->eo_lsm;
1876
1877                         rc = obd_setattr(ec->ec_exp, &oinfo, NULL);
1878                         echo_put_object(eco);
1879                 }
1880                 GOTO(out, rc);
1881
1882         case OBD_IOC_BRW_WRITE:
1883                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1884                         GOTO (out, rc = -EPERM);
1885
1886                 rw = OBD_BRW_WRITE;
1887                 /* fall through */
1888         case OBD_IOC_BRW_READ:
1889                 rc = echo_client_brw_ioctl(rw, exp, data);
1890                 GOTO(out, rc);
1891
1892         case ECHO_IOC_GET_STRIPE:
1893                 rc = echo_get_object(&eco, ed, oa);
1894                 if (rc == 0) {
1895                         rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
1896                                               data->ioc_plen1);
1897                         echo_put_object(eco);
1898                 }
1899                 GOTO(out, rc);
1900
1901         case ECHO_IOC_SET_STRIPE:
1902                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1903                         GOTO (out, rc = -EPERM);
1904
1905                 if (data->ioc_pbuf1 == NULL) {  /* unset */
1906                         rc = echo_get_object(&eco, ed, oa);
1907                         if (rc == 0) {
1908                                 eco->eo_deleted = 1;
1909                                 echo_put_object(eco);
1910                         }
1911                 } else {
1912                         rc = echo_create_object(ed, 0, oa,
1913                                                 data->ioc_pbuf1,
1914                                                 data->ioc_plen1, &dummy_oti);
1915                 }
1916                 GOTO (out, rc);
1917
1918         case ECHO_IOC_ENQUEUE:
1919                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1920                         GOTO (out, rc = -EPERM);
1921
1922                 rc = echo_client_enqueue(exp, oa,
1923                                          data->ioc_conn1, /* lock mode */
1924                                          data->ioc_offset,
1925                                          data->ioc_count);/*extent*/
1926                 GOTO (out, rc);
1927
1928         case ECHO_IOC_CANCEL:
1929                 rc = echo_client_cancel(exp, oa);
1930                 GOTO (out, rc);
1931
1932         default:
1933                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1934                 GOTO (out, rc = -ENOTTY);
1935         }
1936
1937         EXIT;
1938  out:
1939
1940         /* XXX this should be in a helper also called by target_send_reply */
1941         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
1942              i++, ack_lock++) {
1943                 if (!ack_lock->mode)
1944                         break;
1945                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
1946         }
1947
1948 #ifndef HAVE_UNLOCKED_IOCTL
1949         cfs_lock_kernel();
1950 #endif
1951
1952         return rc;
1953 }
1954
1955 static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
1956 {
1957         struct echo_client_obd *ec = &obddev->u.echo_client;
1958         struct obd_device *tgt;
1959         struct obd_uuid echo_uuid = { "ECHO_UUID" };
1960         struct obd_connect_data *ocd = NULL;
1961         int rc;
1962         ENTRY;
1963
1964         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1965                 CERROR("requires a TARGET OBD name\n");
1966                 RETURN(-EINVAL);
1967         }
1968
1969         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1970         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1971                 CERROR("device not attached or not set up (%s)\n",
1972                        lustre_cfg_string(lcfg, 1));
1973                 RETURN(-EINVAL);
1974         }
1975
1976         cfs_spin_lock_init (&ec->ec_lock);
1977         CFS_INIT_LIST_HEAD (&ec->ec_objects);
1978         CFS_INIT_LIST_HEAD (&ec->ec_locks);
1979         ec->ec_unique = 0;
1980         ec->ec_nstripes = 0;
1981
1982         OBD_ALLOC(ocd, sizeof(*ocd));
1983         if (ocd == NULL) {
1984                 CERROR("Can't alloc ocd connecting to %s\n",
1985                        lustre_cfg_string(lcfg, 1));
1986                 return -ENOMEM;
1987         }
1988
1989         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
1990                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
1991                                  OBD_CONNECT_64BITHASH;
1992         ocd->ocd_version = LUSTRE_VERSION_CODE;
1993         ocd->ocd_group = FID_SEQ_ECHO;
1994
1995         rc = obd_connect(NULL, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
1996         if (rc == 0) {
1997                 /* Turn off pinger because it connects to tgt obd directly. */
1998                 cfs_spin_lock(&tgt->obd_dev_lock);
1999                 cfs_list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2000                 cfs_spin_unlock(&tgt->obd_dev_lock);
2001         }
2002
2003         OBD_FREE(ocd, sizeof(*ocd));
2004
2005         if (rc != 0) {
2006                 CERROR("fail to connect to device %s\n",
2007                        lustre_cfg_string(lcfg, 1));
2008                 return (rc);
2009         }
2010
2011         RETURN(rc);
2012 }
2013
2014 static int echo_client_cleanup(struct obd_device *obddev)
2015 {
2016         struct echo_client_obd *ec = &obddev->u.echo_client;
2017         int rc;
2018         ENTRY;
2019
2020         if (!cfs_list_empty(&obddev->obd_exports)) {
2021                 CERROR("still has clients!\n");
2022                 RETURN(-EBUSY);
2023         }
2024
2025         LASSERT(cfs_atomic_read(&ec->ec_exp->exp_refcount) > 0);
2026         rc = obd_disconnect(ec->ec_exp);
2027         if (rc != 0)
2028                 CERROR("fail to disconnect device: %d\n", rc);
2029
2030         RETURN(rc);
2031 }
2032
2033 static int echo_client_connect(const struct lu_env *env,
2034                                struct obd_export **exp,
2035                                struct obd_device *src, struct obd_uuid *cluuid,
2036                                struct obd_connect_data *data, void *localdata)
2037 {
2038         int                rc;
2039         struct lustre_handle conn = { 0 };
2040
2041         ENTRY;
2042         rc = class_connect(&conn, src, cluuid);
2043         if (rc == 0) {
2044                 *exp = class_conn2export(&conn);
2045         }
2046
2047         RETURN (rc);
2048 }
2049
2050 static int echo_client_disconnect(struct obd_export *exp)
2051 {
2052 #if 0
2053         struct obd_device      *obd;
2054         struct echo_client_obd *ec;
2055         struct ec_lock         *ecl;
2056 #endif
2057         int                     rc;
2058         ENTRY;
2059
2060         if (exp == NULL)
2061                 GOTO(out, rc = -EINVAL);
2062
2063 #if 0
2064         obd = exp->exp_obd;
2065         ec = &obd->u.echo_client;
2066
2067         /* no more contention on export's lock list */
2068         while (!cfs_list_empty (&exp->exp_ec_data.eced_locks)) {
2069                 ecl = cfs_list_entry (exp->exp_ec_data.eced_locks.next,
2070                                       struct ec_lock, ecl_exp_chain);
2071                 cfs_list_del (&ecl->ecl_exp_chain);
2072
2073                 rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
2074                                  ecl->ecl_mode, &ecl->ecl_lock_handle);
2075
2076                 CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
2077                         "(%d)\n", ecl->ecl_object->eco_id, rc);
2078
2079                 echo_put_object (ecl->ecl_object);
2080                 OBD_FREE (ecl, sizeof (*ecl));
2081         }
2082 #endif
2083
2084         rc = class_disconnect(exp);
2085         GOTO(out, rc);
2086  out:
2087         return rc;
2088 }
2089
2090 static struct obd_ops echo_obd_ops = {
2091         .o_owner       = THIS_MODULE,
2092
2093 #if 0
2094         .o_setup       = echo_client_setup,
2095         .o_cleanup     = echo_client_cleanup,
2096 #endif
2097
2098         .o_iocontrol   = echo_client_iocontrol,
2099         .o_connect     = echo_client_connect,
2100         .o_disconnect  = echo_client_disconnect
2101 };
2102
2103 int echo_client_init(void)
2104 {
2105         struct lprocfs_static_vars lvars = { 0 };
2106         int rc;
2107
2108         lprocfs_echo_init_vars(&lvars);
2109
2110         rc = lu_kmem_init(echo_caches);
2111         if (rc == 0) {
2112                 rc = class_register_type(&echo_obd_ops, NULL,
2113                                          lvars.module_vars,
2114                                          LUSTRE_ECHO_CLIENT_NAME,
2115                                          &echo_device_type);
2116                 if (rc)
2117                         lu_kmem_fini(echo_caches);
2118         }
2119         return rc;
2120 }
2121
2122 void echo_client_exit(void)
2123 {
2124         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2125         lu_kmem_fini(echo_caches);
2126 }
2127
2128 /** @} echo_client */