Whamcloud - gitweb
fix module initialization
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #define DEBUG_SUBSYSTEM S_ECHO
26 #ifdef __KERNEL__
27 #include <linux/version.h>
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/completion.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/iobuf.h>
33 #endif
34 #include <asm/div64.h>
35 #include <linux/smp_lock.h>
36 #else
37 #include <liblustre.h>
38 #endif
39
40 #include <linux/obd.h>
41 #include <linux/obd_support.h>
42 #include <linux/obd_class.h>
43 #include <linux/obd_echo.h>
44 #include <linux/lustre_ver.h>
45 #include <linux/lustre_debug.h>
46 #include <linux/lprocfs_status.h>
47
48 static obd_id last_object_id;
49
50 #if 0
51 static void
52 echo_printk_object (char *msg, struct ec_object *eco)
53 {
54         struct lov_stripe_md *lsm = eco->eco_lsm;
55         int                   i;
56
57         printk (KERN_INFO "Lustre: %s: object %p: "LPX64", refs %d%s: "LPX64
58                 "=%u!%u\n", msg, eco, eco->eco_id, eco->eco_refcount,
59                 eco->eco_deleted ? "(deleted) " : "",
60                 lsm->lsm_object_id, lsm->lsm_stripe_size,
61                 lsm->lsm_stripe_count);
62
63         for (i = 0; i < lsm->lsm_stripe_count; i++)
64                 printk (KERN_INFO "Lustre:   @%2u:"LPX64"\n",
65                         lsm->lsm_oinfo[i].loi_ost_idx,
66                         lsm->lsm_oinfo[i].loi_id);
67 }
68 #endif
69
70 static struct ec_object *
71 echo_find_object_locked (struct obd_device *obd, obd_id id)
72 {
73         struct echo_client_obd *ec = &obd->u.echo_client;
74         struct ec_object       *eco = NULL;
75         struct list_head       *el;
76
77         list_for_each (el, &ec->ec_objects) {
78                 eco = list_entry (el, struct ec_object, eco_obj_chain);
79
80                 if (eco->eco_id == id)
81                         return (eco);
82         }
83         return (NULL);
84 }
85
86 static int
87 echo_copyout_lsm (struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob)
88 {
89         int nob;
90
91         nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
92         if (nob > ulsm_nob)
93                 return (-EINVAL);
94
95         if (copy_to_user (ulsm, lsm, nob))
96                 return (-EFAULT);
97
98         return (0);
99 }
100
101 static int
102 echo_copyin_lsm (struct obd_device *obd, struct lov_stripe_md *lsm,
103                  void *ulsm, int ulsm_nob)
104 {
105         struct echo_client_obd *ec = &obd->u.echo_client;
106         int                     nob;
107
108         if (ulsm_nob < sizeof (*lsm))
109                 return (-EINVAL);
110
111         if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
112                 return (-EFAULT);
113
114         nob = lsm->lsm_stripe_count * sizeof (lsm->lsm_oinfo[0]);
115
116         if (ulsm_nob < nob ||
117             lsm->lsm_stripe_count > ec->ec_nstripes ||
118             lsm->lsm_magic != LOV_MAGIC ||
119             (lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 ||
120             ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
121                 return (-EINVAL);
122
123         if (copy_from_user(lsm->lsm_oinfo,
124                            ((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob))
125                 return (-EFAULT);
126
127         return (0);
128 }
129
130 static struct ec_object *
131 echo_allocate_object (struct obd_device *obd)
132 {
133         struct echo_client_obd *ec = &obd->u.echo_client;
134         struct ec_object       *eco;
135         int rc;
136
137         OBD_ALLOC(eco, sizeof (*eco));
138         if (eco == NULL)
139                 return NULL;
140
141         rc = obd_alloc_memmd(ec->ec_exp, &eco->eco_lsm);
142         if (rc < 0) {
143                 OBD_FREE(eco, sizeof (*eco));
144                 return NULL;
145         }
146
147         eco->eco_device = obd;
148         eco->eco_deleted = 0;
149         eco->eco_refcount = 0;
150         eco->eco_lsm->lsm_magic = LOV_MAGIC;
151         /* leave stripe count 0 by default */
152
153         return (eco);
154 }
155
156 static void
157 echo_free_object (struct ec_object *eco)
158 {
159         struct obd_device      *obd = eco->eco_device;
160         struct echo_client_obd *ec = &obd->u.echo_client;
161
162         LASSERT (eco->eco_refcount == 0);
163         obd_free_memmd(ec->ec_exp, &eco->eco_lsm);
164         OBD_FREE (eco, sizeof (*eco));
165 }
166
167 static int echo_create_object(struct obd_device *obd, int on_target,
168                               struct obdo *oa, void *ulsm, int ulsm_nob,
169                               struct obd_trans_info *oti)
170 {
171         struct echo_client_obd *ec = &obd->u.echo_client;
172         struct ec_object       *eco2;
173         struct ec_object       *eco;
174         struct lov_stripe_md   *lsm;
175         int                     rc;
176         int                     i, idx;
177
178         if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
179             (on_target ||                       /* set_stripe */
180              ec->ec_nstripes != 0)) {           /* LOV */
181                 CERROR ("No valid oid\n");
182                 return (-EINVAL);
183         }
184
185         if (ulsm != NULL) {
186                 eco = echo_allocate_object (obd);
187                 if (eco == NULL)
188                         return (-ENOMEM);
189
190                 lsm = eco->eco_lsm;
191
192                 rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob);
193                 if (rc != 0)
194                         goto failed;
195
196                 /* setup object ID here for !on_target and LOV hint */
197                 if ((oa->o_valid & OBD_MD_FLID) != 0)
198                         eco->eco_id = lsm->lsm_object_id = oa->o_id;
199
200                 if (lsm->lsm_stripe_count == 0)
201                         lsm->lsm_stripe_count = ec->ec_nstripes;
202
203                 if (lsm->lsm_stripe_size == 0)
204                         lsm->lsm_stripe_size = PAGE_SIZE;
205
206                 idx = ll_rand();
207
208                 /* setup stripes: indices + default ids if required */
209                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
210                         if (lsm->lsm_oinfo[i].loi_id == 0)
211                                 lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
212
213                         lsm->lsm_oinfo[i].loi_ost_idx =
214                                 (idx + i) % ec->ec_nstripes;
215                 }
216         } else {
217                 OBD_ALLOC(eco, sizeof(*eco));
218                 eco->eco_device = obd;
219                 lsm = NULL;
220         }
221
222         if (oa->o_id == 0)
223                 oa->o_id = ++last_object_id;
224
225         if (on_target) {
226                 oa->o_gr = FILTER_GROUP_ECHO;
227                 oa->o_valid |= OBD_MD_FLGROUP;
228
229                 rc = obd_create(ec->ec_exp, oa, &lsm, oti);
230                 if (rc != 0)
231                         goto failed;
232
233                 /* See what object ID we were given */
234                 eco->eco_id = oa->o_id = lsm->lsm_object_id;
235                 oa->o_valid |= OBD_MD_FLID;
236
237                 LASSERT(eco->eco_lsm == NULL || eco->eco_lsm == lsm);
238                 eco->eco_lsm = lsm;
239         }
240
241         spin_lock (&ec->ec_lock);
242
243         eco2 = echo_find_object_locked (obd, oa->o_id);
244         if (eco2 != NULL) {                     /* conflict */
245                 spin_unlock (&ec->ec_lock);
246
247                 CERROR ("Can't create object id "LPX64": id already exists%s\n",
248                         oa->o_id, on_target ? " (undoing create)" : "");
249
250                 if (on_target)
251                         obd_destroy(ec->ec_exp, oa, lsm, oti, NULL);
252
253                 rc = -EEXIST;
254                 goto failed;
255         }
256
257         list_add (&eco->eco_obj_chain, &ec->ec_objects);
258         spin_unlock (&ec->ec_lock);
259         CDEBUG (D_INFO,
260                 "created %p: "LPX64"=%u#%u@%u refs %d del %d\n",
261                 eco, eco->eco_id,
262                 eco->eco_lsm->lsm_stripe_size,
263                 eco->eco_lsm->lsm_stripe_count,
264                 eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
265                 eco->eco_refcount, eco->eco_deleted);
266         return (0);
267
268  failed:
269         echo_free_object (eco);
270         return (rc);
271 }
272
273 static int
274 echo_get_object (struct ec_object **ecop, struct obd_device *obd,
275                  struct obdo *oa)
276 {
277         struct echo_client_obd *ec = &obd->u.echo_client;
278         struct ec_object       *eco;
279         struct ec_object       *eco2;
280         int                     rc;
281
282         if ((oa->o_valid & OBD_MD_FLID) == 0 ||
283             oa->o_id == 0)                      /* disallow use of object id 0 */
284         {
285                 CERROR ("No valid oid\n");
286                 return (-EINVAL);
287         }
288
289         spin_lock (&ec->ec_lock);
290         eco = echo_find_object_locked (obd, oa->o_id);
291         if (eco != NULL) {
292                 if (eco->eco_deleted)           /* being deleted */
293                         return (-EAGAIN);       /* (see comment in cleanup) */
294
295                 eco->eco_refcount++;
296                 spin_unlock (&ec->ec_lock);
297                 *ecop = eco;
298                 CDEBUG (D_INFO,
299                         "found %p: "LPX64"=%u#%u@%u refs %d del %d\n",
300                         eco, eco->eco_id,
301                         eco->eco_lsm->lsm_stripe_size,
302                         eco->eco_lsm->lsm_stripe_count,
303                         eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
304                         eco->eco_refcount, eco->eco_deleted);
305                 return (0);
306         }
307         spin_unlock (&ec->ec_lock);
308
309         if (ec->ec_nstripes != 0)               /* striping required */
310                 return (-ENOENT);
311
312         eco = echo_allocate_object (obd);
313         if (eco == NULL)
314                 return (-ENOMEM);
315
316         eco->eco_id = eco->eco_lsm->lsm_object_id = oa->o_id;
317
318         spin_lock (&ec->ec_lock);
319
320         eco2 = echo_find_object_locked (obd, oa->o_id);
321         if (eco2 == NULL) {                     /* didn't race */
322                 list_add (&eco->eco_obj_chain, &ec->ec_objects);
323                 spin_unlock (&ec->ec_lock);
324                 eco->eco_refcount = 1;
325                 *ecop = eco;
326                 CDEBUG (D_INFO,
327                         "created %p: "LPX64"=%u#%u@%d refs %d del %d\n",
328                         eco, eco->eco_id,
329                         eco->eco_lsm->lsm_stripe_size,
330                         eco->eco_lsm->lsm_stripe_count,
331                         eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
332                         eco->eco_refcount, eco->eco_deleted);
333                 return (0);
334         }
335
336         if (eco2->eco_deleted)
337                 rc = -EAGAIN;                   /* lose race */
338         else {
339                 eco2->eco_refcount++;           /* take existing */
340                 *ecop = eco2;
341                 rc = 0;
342                 LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id);
343                 CDEBUG (D_INFO,
344                         "found(2) %p: "LPX64"=%u#%u@%d refs %d del %d\n",
345                         eco2, eco2->eco_id,
346                         eco2->eco_lsm->lsm_stripe_size,
347                         eco2->eco_lsm->lsm_stripe_count,
348                         eco2->eco_lsm->lsm_oinfo[0].loi_ost_idx,
349                         eco2->eco_refcount, eco2->eco_deleted);
350         }
351
352         spin_unlock (&ec->ec_lock);
353
354         echo_free_object (eco);
355         return (rc);
356 }
357
358 static void
359 echo_put_object (struct ec_object *eco)
360 {
361         struct obd_device      *obd = eco->eco_device;
362         struct echo_client_obd *ec = &obd->u.echo_client;
363
364         /* Release caller's ref on the object.
365          * delete => mark for deletion when last ref goes
366          */
367
368         spin_lock (&ec->ec_lock);
369
370         eco->eco_refcount--;
371         LASSERT (eco->eco_refcount >= 0);
372
373         CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u@%d refs %d del %d\n",
374                eco, eco->eco_id,
375                eco->eco_lsm->lsm_stripe_size,
376                eco->eco_lsm->lsm_stripe_count,
377                eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
378                eco->eco_refcount, eco->eco_deleted);
379
380         if (eco->eco_refcount != 0 || !eco->eco_deleted) {
381                 spin_unlock (&ec->ec_lock);
382                 return;
383         }
384
385         spin_unlock (&ec->ec_lock);
386
387         /* NB leave obj in the object list.  We must prevent anyone from
388          * attempting to enqueue on this object number until we can be
389          * sure there will be no more lock callbacks.
390          */
391         obd_cancel_unused(ec->ec_exp, eco->eco_lsm, 0, NULL);
392
393         /* now we can let it go */
394         spin_lock (&ec->ec_lock);
395         list_del (&eco->eco_obj_chain);
396         spin_unlock (&ec->ec_lock);
397
398         LASSERT (eco->eco_refcount == 0);
399
400         echo_free_object (eco);
401 }
402
403 static void
404 echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
405 {
406         unsigned long stripe_count;
407         unsigned long stripe_size;
408         unsigned long width;
409         unsigned long woffset;
410         int           stripe_index;
411         obd_off       offset;
412
413         if (lsm->lsm_stripe_count <= 1)
414                 return;
415
416         offset       = *offp;
417         stripe_size  = lsm->lsm_stripe_size;
418         stripe_count = lsm->lsm_stripe_count;
419
420         /* width = # bytes in all stripes */
421         width = stripe_size * stripe_count;
422
423         /* woffset = offset within a width; offset = whole number of widths */
424         woffset = do_div (offset, width);
425
426         stripe_index = woffset / stripe_size;
427
428         *idp = lsm->lsm_oinfo[stripe_index].loi_id;
429         *offp = offset * stripe_size + woffset % stripe_size;
430 }
431
432 static void
433 echo_client_page_debug_setup(struct lov_stripe_md *lsm,
434                              struct page *page, int rw, obd_id id,
435                              obd_off offset, obd_off count)
436 {
437         char    *addr;
438         obd_off  stripe_off;
439         obd_id   stripe_id;
440         int      delta;
441
442         /* no partial pages on the client */
443         LASSERT(count == PAGE_SIZE);
444
445         addr = kmap(page);
446
447         for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
448                 if (rw == OBD_BRW_WRITE) {
449                         stripe_off = offset + delta;
450                         stripe_id = id;
451                         echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
452                 } else {
453                         stripe_off = 0xdeadbeef00c0ffeeULL;
454                         stripe_id = 0xdeadbeef00c0ffeeULL;
455                 }
456                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
457                                   stripe_off, stripe_id);
458         }
459
460         kunmap(page);
461 }
462
463 static int
464 echo_client_page_debug_check(struct lov_stripe_md *lsm,
465                              struct page *page, obd_id id,
466                              obd_off offset, obd_off count)
467 {
468         obd_off stripe_off;
469         obd_id  stripe_id;
470         char   *addr;
471         int     delta;
472         int     rc;
473         int     rc2;
474
475         /* no partial pages on the client */
476         LASSERT(count == PAGE_SIZE);
477
478         addr = kmap(page);
479
480         for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
481                 stripe_off = offset + delta;
482                 stripe_id = id;
483                 echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
484
485                 rc2 = block_debug_check("test_brw",
486                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
487                                         stripe_off, stripe_id);
488                 if (rc2 != 0) {
489                         CERROR ("Error in echo object "LPX64"\n", id);
490                         rc = rc2;
491                 }
492         }
493
494         kunmap(page);
495         return rc;
496 }
497
498 static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
499                             struct lov_stripe_md *lsm, obd_off offset,
500                             obd_size count, struct obd_trans_info *oti)
501 {
502         struct echo_client_obd *ec = &obd->u.echo_client;
503         obd_count               npages;
504         struct brw_page        *pga;
505         struct brw_page        *pgp;
506         obd_off                 off;
507         int                     i;
508         int                     rc;
509         int                     verify;
510         int                     gfp_mask;
511
512         verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
513                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
514                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
515
516         gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
517
518         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
519         LASSERT(lsm != NULL);
520         LASSERT(lsm->lsm_object_id == oa->o_id);
521
522         if (count <= 0 ||
523             (count & (PAGE_SIZE - 1)) != 0)
524                 return (-EINVAL);
525
526         /* XXX think again with misaligned I/O */
527         npages = count >> PAGE_SHIFT;
528
529         OBD_ALLOC(pga, npages * sizeof(*pga));
530         if (pga == NULL)
531                 return (-ENOMEM);
532
533         for (i = 0, pgp = pga, off = offset;
534              i < npages;
535              i++, pgp++, off += PAGE_SIZE) {
536
537                 LASSERT (pgp->pg == NULL);      /* for cleanup */
538
539                 rc = -ENOMEM;
540                 pgp->pg = alloc_pages (gfp_mask, 0);
541                 if (pgp->pg == NULL)
542                         goto out;
543
544                 pgp->count = PAGE_SIZE;
545                 pgp->off = off;
546                 pgp->flag = 0;
547
548                 if (verify)
549                         echo_client_page_debug_setup(lsm, pgp->pg, rw,
550                                                      oa->o_id, off, pgp->count);
551         }
552
553         rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
554
555  out:
556         if (rc != 0 || rw != OBD_BRW_READ)
557                 verify = 0;
558
559         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
560                 if (pgp->pg == NULL)
561                         continue;
562
563                 if (verify) {
564                         int vrc;
565                         vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,
566                                                            pgp->off, pgp->count);
567                         if (vrc != 0 && rc == 0)
568                                 rc = vrc;
569                 }
570                 __free_pages(pgp->pg, 0);
571         }
572         OBD_FREE(pga, npages * sizeof(*pga));
573         return (rc);
574 }
575
576 #ifdef __KERNEL__
577 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
578 static int echo_client_ubrw(struct obd_device *obd, int rw,
579                             struct obdo *oa, struct lov_stripe_md *lsm,
580                             obd_off offset, obd_size count, char *buffer,
581                             struct obd_trans_info *oti)
582 {
583         struct echo_client_obd *ec = &obd->u.echo_client;
584         obd_count               npages;
585         struct brw_page        *pga;
586         struct brw_page        *pgp;
587         obd_off                 off;
588         struct kiobuf          *kiobuf;
589         int                     i;
590         int                     rc;
591
592         LASSERT (rw == OBD_BRW_WRITE ||
593                  rw == OBD_BRW_READ);
594
595         /* NB: for now, only whole pages, page aligned */
596
597         if (count <= 0 ||
598             ((long)buffer & (PAGE_SIZE - 1)) != 0 ||
599             (count & (PAGE_SIZE - 1)) != 0 ||
600             (lsm != NULL && lsm->lsm_object_id != oa->o_id))
601                 return (-EINVAL);
602
603         /* XXX think again with misaligned I/O */
604         npages = count >> PAGE_SHIFT;
605
606         OBD_ALLOC(pga, npages * sizeof(*pga));
607         if (pga == NULL)
608                 return (-ENOMEM);
609
610         rc = alloc_kiovec (1, &kiobuf);
611         if (rc != 0)
612                 goto out_1;
613
614         rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE,
615                               kiobuf, (unsigned long)buffer, count);
616         if (rc != 0)
617                 goto out_2;
618
619         LASSERT (kiobuf->offset == 0);
620         LASSERT (kiobuf->nr_pages == npages);
621
622         for (i = 0, off = offset, pgp = pga;
623              i < npages;
624              i++, off += PAGE_SIZE, pgp++) {
625                 pgp->off = off;
626                 pgp->pg = kiobuf->maplist[i];
627                 pgp->count = PAGE_SIZE;
628                 pgp->flag = 0;
629         }
630
631         rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
632
633         //        if (rw == OBD_BRW_READ)
634         //                mark_dirty_kiobuf (kiobuf, count);
635
636         unmap_kiobuf (kiobuf);
637  out_2:
638         free_kiovec (1, &kiobuf);
639  out_1:
640         OBD_FREE(pga, npages * sizeof(*pga));
641         return (rc);
642 }
643 #else
644 static int echo_client_ubrw(struct obd_device *obd, int rw,
645                             struct obdo *oa, struct lov_stripe_md *lsm,
646                             obd_off offset, obd_size count, char *buffer,
647                             struct obd_trans_info *oti)
648 {
649 #warning "echo_client_ubrw() needs to be ported on 2.6 yet"
650         LBUG();
651         return 0;
652 }
653 #endif
654 #endif
655
656 struct echo_async_state;
657
658 #define EAP_MAGIC 79277927
659 struct echo_async_page {
660         int                     eap_magic;
661         struct page             *eap_page;
662         void                    *eap_cookie;
663         obd_off                 eap_off;
664         struct echo_async_state *eap_eas;
665         struct list_head        eap_item;
666 };
667
668 #define EAP_FROM_COOKIE(c)                                                      \
669         (LASSERT(((struct echo_async_page *)(c))->eap_magic == EAP_MAGIC),      \
670          (struct echo_async_page *)(c))
671
672 struct echo_async_state {
673         spinlock_t              eas_lock;
674         obd_off                 eas_next_offset;
675         obd_off                 eas_end_offset;
676         int                     eas_in_flight;
677         int                     eas_rc;
678         wait_queue_head_t       eas_waitq;
679         struct list_head        eas_avail;
680         struct obdo             eas_oa;
681         struct lov_stripe_md    *eas_lsm;
682 };
683
684 static int eas_should_wake(struct echo_async_state *eas)
685 {
686         unsigned long flags;
687         int rc = 0;
688         spin_lock_irqsave(&eas->eas_lock, flags);
689         if (eas->eas_rc == 0 && !list_empty(&eas->eas_avail))
690             rc = 1;
691         spin_unlock_irqrestore(&eas->eas_lock, flags);
692         return rc;
693 };
694
695 static int ec_ap_make_ready(void *data, int cmd)
696 {
697         /* our pages are issued ready */
698         LBUG();
699         return 0;
700 }
701 static int ec_ap_refresh_count(void *data, int cmd)
702 {
703         /* our pages are issued with a stable count */
704         LBUG();
705         return PAGE_SIZE;
706 }
707 static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
708 {
709         struct echo_async_page *eap = EAP_FROM_COOKIE(data);
710
711         memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa));
712 }
713
714 static void ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
715 {
716         struct echo_async_page *eap = EAP_FROM_COOKIE(data);
717         struct echo_async_state *eas;
718         unsigned long flags;
719
720         eas = eap->eap_eas;
721
722         if (cmd == OBD_BRW_READ &&
723             eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID &&
724             (eas->eas_oa.o_valid & OBD_MD_FLFLAGS) != 0 &&
725             (eas->eas_oa.o_flags & OBD_FL_DEBUG_CHECK) != 0)
726                 echo_client_page_debug_check(eas->eas_lsm, eap->eap_page,
727                                              eas->eas_oa.o_id, eap->eap_off,
728                                              PAGE_SIZE);
729
730         spin_lock_irqsave(&eas->eas_lock, flags);
731         if (rc && !eas->eas_rc)
732                 eas->eas_rc = rc;
733         eas->eas_in_flight--;
734         list_add(&eap->eap_item, &eas->eas_avail);
735         wake_up(&eas->eas_waitq);
736         spin_unlock_irqrestore(&eas->eas_lock, flags);
737 }
738
739 static struct obd_async_page_ops ec_async_page_ops = {
740         .ap_make_ready =        ec_ap_make_ready,
741         .ap_refresh_count =     ec_ap_refresh_count,
742         .ap_fill_obdo =         ec_ap_fill_obdo,
743         .ap_completion =        ec_ap_completion,
744 };
745
746 static int echo_client_async_page(struct obd_export *exp, int rw,
747                                    struct obdo *oa, struct lov_stripe_md *lsm,
748                                    obd_off offset, obd_size count,
749                                    obd_size batching)
750 {
751         obd_count npages, i;
752         struct echo_async_page *eap;
753         struct echo_async_state eas;
754         struct list_head *pos, *n;
755         int rc = 0;
756         unsigned long flags;
757         LIST_HEAD(pages);
758 #if 0
759         int                     verify;
760         int                     gfp_mask;
761
762         verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
763                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
764                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
765
766         gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
767 #endif
768
769         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
770
771         if (count <= 0 ||
772             (count & (PAGE_SIZE - 1)) != 0 ||
773             (lsm != NULL &&
774              lsm->lsm_object_id != oa->o_id))
775                 return (-EINVAL);
776
777         /* XXX think again with misaligned I/O */
778         npages = batching >> PAGE_SHIFT;
779
780         memcpy(&eas.eas_oa, oa, sizeof(*oa));
781         eas.eas_next_offset = offset;
782         eas.eas_end_offset = offset + count;
783         spin_lock_init(&eas.eas_lock);
784         init_waitqueue_head(&eas.eas_waitq);
785         eas.eas_in_flight = 0;
786         eas.eas_rc = 0;
787         eas.eas_lsm = lsm;
788         INIT_LIST_HEAD(&eas.eas_avail);
789
790         /* prepare the group of pages that we're going to be keeping
791          * in flight */
792         for (i = 0; i < npages; i++) {
793                 struct page *page = alloc_page(GFP_KERNEL);
794                 if (page == NULL)
795                         GOTO(out, rc = -ENOMEM);
796
797                 page->private = 0;
798                 list_add_tail(&PAGE_LIST(page), &pages);
799
800                 OBD_ALLOC(eap, sizeof(*eap));
801                 if (eap == NULL)
802                         GOTO(out, rc = -ENOMEM);
803
804                 eap->eap_magic = EAP_MAGIC;
805                 eap->eap_page = page;
806                 eap->eap_eas = &eas;
807                 page->private = (unsigned long)eap;
808                 list_add_tail(&eap->eap_item, &eas.eas_avail);
809         }
810
811         /* first we spin queueing io and being woken by its completion */
812         spin_lock_irqsave(&eas.eas_lock, flags);
813         for(;;) {
814                 int rc;
815
816                 /* sleep until we have a page to send */
817                 spin_unlock_irqrestore(&eas.eas_lock, flags);
818                 rc = wait_event_interruptible(eas.eas_waitq,
819                                               eas_should_wake(&eas));
820                 spin_lock_irqsave(&eas.eas_lock, flags);
821                 if (rc && !eas.eas_rc)
822                         eas.eas_rc = rc;
823                 if (eas.eas_rc)
824                         break;
825                 if (list_empty(&eas.eas_avail))
826                         continue;
827                 eap = list_entry(eas.eas_avail.next, struct echo_async_page,
828                                  eap_item);
829                 list_del(&eap->eap_item);
830                 spin_unlock_irqrestore(&eas.eas_lock, flags);
831
832                 /* unbind the eap from its old page offset */
833                 if (eap->eap_cookie != NULL) {
834                         obd_teardown_async_page(exp, lsm, NULL,
835                                                 eap->eap_cookie);
836                         eap->eap_cookie = NULL;
837                 }
838
839                 eas.eas_next_offset += PAGE_SIZE;
840                 eap->eap_off = eas.eas_next_offset;
841
842                 rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
843                                          eap->eap_off, &ec_async_page_ops,
844                                          eap, &eap->eap_cookie);
845                 if (rc) {
846                         spin_lock_irqsave(&eas.eas_lock, flags);
847                         eas.eas_rc = rc;
848                         break;
849                 }
850
851                 if (oa->o_id != ECHO_PERSISTENT_OBJID &&
852                     (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
853                     (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0)
854                         echo_client_page_debug_setup(lsm, eap->eap_page, rw,
855                                                      oa->o_id,
856                                                      eap->eap_off, PAGE_SIZE);
857
858                 /* always asserts urgent, which isn't quite right */
859                 rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie,
860                                         rw, 0, PAGE_SIZE, 0,
861                                         ASYNC_READY | ASYNC_URGENT |
862                                         ASYNC_COUNT_STABLE);
863                 spin_lock_irqsave(&eas.eas_lock, flags);
864                 if (rc && !eas.eas_rc) {
865                         eas.eas_rc = rc;
866                         break;
867                 }
868                 eas.eas_in_flight++;
869                 if (eas.eas_next_offset == eas.eas_end_offset)
870                         break;
871         }
872
873         /* still hold the eas_lock here.. */
874
875         /* now we just spin waiting for all the rpcs to complete */
876         while(eas.eas_in_flight) {
877                 spin_unlock_irqrestore(&eas.eas_lock, flags);
878                 wait_event_interruptible(eas.eas_waitq,
879                                          eas.eas_in_flight == 0);
880                 spin_lock_irqsave(&eas.eas_lock, flags);
881         }
882         spin_unlock_irqrestore(&eas.eas_lock, flags);
883
884 out:
885         list_for_each_safe(pos, n, &pages) {
886                 struct page *page = list_entry(pos, struct page,
887                                                PAGE_LIST_ENTRY);
888
889                 list_del(&PAGE_LIST(page));
890                 if (page->private != 0) {
891                         eap = (struct echo_async_page *)page->private;
892                         if (eap->eap_cookie != NULL)
893                                 obd_teardown_async_page(exp, lsm, NULL,
894                                                         eap->eap_cookie);
895                         OBD_FREE(eap, sizeof(*eap));
896                 }
897                 __free_page(page);
898         }
899
900         RETURN(rc);
901 }
902
903 static int echo_client_prep_commit(struct obd_export *exp, int rw,
904                                    struct obdo *oa, struct lov_stripe_md *lsm,
905                                    obd_off offset, obd_size count,
906                                    obd_size batch, struct obd_trans_info *oti)
907 {
908         struct obd_ioobj ioo;
909         struct niobuf_local *lnb;
910         struct niobuf_remote *rnb;
911         obd_off off;
912         obd_size npages, tot_pages;
913         int i, ret = 0;
914         ENTRY;
915
916         if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 ||
917             (lsm != NULL && lsm->lsm_object_id != oa->o_id))
918                 RETURN(-EINVAL);
919
920         npages = batch >> PAGE_SHIFT;
921         tot_pages = count >> PAGE_SHIFT;
922
923         OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
924         OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
925
926         if (lnb == NULL || rnb == NULL)
927                 GOTO(out, ret = -ENOMEM);
928
929         obdo_to_ioobj(oa, &ioo);
930
931         off = offset;
932
933         for(; tot_pages; tot_pages -= npages) {
934                 if (tot_pages < npages)
935                         npages = tot_pages;
936
937                 for (i = 0; i < npages; i++, off += PAGE_SIZE) {
938                         rnb[i].offset = off;
939                         rnb[i].len = PAGE_SIZE;
940                 }
941
942                 ioo.ioo_bufcnt = npages;
943                 oti->oti_transno = 0;
944
945                 ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti);
946                 if (ret != 0)
947                         GOTO(out, ret);
948
949                 for (i = 0; i < npages; i++) {
950                         struct page *page = lnb[i].page;
951
952                         /* read past eof? */
953                         if (page == NULL && lnb[i].rc == 0)
954                                 continue;
955
956                         if (oa->o_id == ECHO_PERSISTENT_OBJID ||
957                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
958                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
959                                 continue;
960
961                         if (rw == OBD_BRW_WRITE)
962                                 echo_client_page_debug_setup(lsm, page, rw,
963                                                              oa->o_id,
964                                                              rnb[i].offset,
965                                                              rnb[i].len);
966                         else
967                                 echo_client_page_debug_check(lsm, page,
968                                                              oa->o_id,
969                                                              rnb[i].offset,
970                                                              rnb[i].len);
971                 }
972
973                 ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
974                 if (ret != 0)
975                         GOTO(out, ret);
976         }
977
978 out:
979         if (lnb)
980                 OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
981         if (rnb)
982                 OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
983         RETURN(ret);
984 }
985
986 int echo_client_brw_ioctl(int rw, struct obd_export *exp,
987                           struct obd_ioctl_data *data)
988 {
989         struct obd_device *obd = class_exp2obd(exp);
990         struct echo_client_obd *ec = &obd->u.echo_client;
991         struct obd_trans_info dummy_oti = { .oti_thread_id = -1 };
992         struct ec_object *eco;
993         int rc;
994         ENTRY;
995
996         rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
997         if (rc)
998                 RETURN(rc);
999
1000         data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE;
1001         data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP;
1002         data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO;
1003
1004         switch((long)data->ioc_pbuf1) {
1005         case 1:
1006                 if (data->ioc_pbuf2 == NULL) { // NULL user data pointer
1007                         rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
1008                                               eco->eco_lsm, data->ioc_offset,
1009                                               data->ioc_count, &dummy_oti);
1010                 } else {
1011 #ifdef __KERNEL__
1012                         rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
1013                                               eco->eco_lsm, data->ioc_offset,
1014                                               data->ioc_count, data->ioc_pbuf2,
1015                                               &dummy_oti);
1016 #endif
1017                 }
1018                 break;
1019         case 2:
1020                 rc = echo_client_async_page(ec->ec_exp, rw, &data->ioc_obdo1,
1021                                            eco->eco_lsm, data->ioc_offset,
1022                                            data->ioc_count, data->ioc_plen1);
1023                 break;
1024         case 3:
1025                 rc = echo_client_prep_commit(ec->ec_exp, rw, &data->ioc_obdo1,
1026                                             eco->eco_lsm, data->ioc_offset,
1027                                             data->ioc_count, data->ioc_plen1,
1028                                             &dummy_oti);
1029                 break;
1030         default:
1031                 rc = -EINVAL;
1032         }
1033         echo_put_object(eco);
1034         RETURN(rc);
1035 }
1036
1037 static int
1038 echo_ldlm_callback (struct ldlm_lock *lock, struct ldlm_lock_desc *new,
1039                     void *data, int flag)
1040 {
1041         struct ec_object       *eco = (struct ec_object *)data;
1042         struct echo_client_obd *ec = &(eco->eco_device->u.echo_client);
1043         struct lustre_handle    lockh;
1044         struct list_head       *el;
1045         int                     found = 0;
1046         int                     rc;
1047
1048         ldlm_lock2handle (lock, &lockh);
1049
1050         /* #ifdef this out if we're not feeling paranoid */
1051         spin_lock (&ec->ec_lock);
1052         list_for_each (el, &ec->ec_objects) {
1053                 found = (eco == list_entry(el, struct ec_object,
1054                                            eco_obj_chain));
1055                 if (found)
1056                         break;
1057         }
1058         spin_unlock (&ec->ec_lock);
1059         LASSERT (found);
1060
1061         switch (flag) {
1062         case LDLM_CB_BLOCKING:
1063                 CDEBUG(D_INFO, "blocking callback on "LPX64", handle "LPX64"\n",
1064                        eco->eco_id, lockh.cookie);
1065                 rc = ldlm_cli_cancel (&lockh);
1066                 if (rc != ELDLM_OK)
1067                         CERROR ("ldlm_cli_cancel failed: %d\n", rc);
1068                 break;
1069
1070         case LDLM_CB_CANCELING:
1071                 CDEBUG(D_INFO, "cancel callback on "LPX64", handle "LPX64"\n",
1072                        eco->eco_id, lockh.cookie);
1073                 break;
1074
1075         default:
1076                 LBUG ();
1077         }
1078
1079         return (0);
1080 }
1081
1082 static int
1083 echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
1084                     int mode, obd_off offset, obd_size nob)
1085 {
1086         struct obd_device      *obd = exp->exp_obd;
1087         struct echo_client_obd *ec = &obd->u.echo_client;
1088         struct lustre_handle   *ulh = obdo_handle (oa);
1089         struct ec_object       *eco;
1090         struct ec_lock         *ecl;
1091         int                     flags;
1092         int                     rc;
1093
1094         if (!(mode == LCK_PR || mode == LCK_PW))
1095                 return -EINVAL;
1096
1097         if ((offset & (PAGE_SIZE - 1)) != 0 ||
1098             (nob & (PAGE_SIZE - 1)) != 0)
1099                 return -EINVAL;
1100
1101         rc = echo_get_object (&eco, obd, oa);
1102         if (rc != 0)
1103                 return rc;
1104
1105         rc = -ENOMEM;
1106         OBD_ALLOC (ecl, sizeof (*ecl));
1107         if (ecl == NULL)
1108                 goto failed_0;
1109
1110         ecl->ecl_mode = mode;
1111         ecl->ecl_object = eco;
1112         ecl->ecl_policy.l_extent.start = offset;
1113         ecl->ecl_policy.l_extent.end =
1114                 (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
1115
1116         flags = 0;
1117         rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT,
1118                          &ecl->ecl_policy, mode, &flags, echo_ldlm_callback,
1119                          ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb),
1120                          lustre_swab_ost_lvb, &ecl->ecl_lock_handle);
1121         if (rc != 0)
1122                 goto failed_1;
1123
1124         CDEBUG(D_INFO, "enqueue handle "LPX64"\n", ecl->ecl_lock_handle.cookie);
1125
1126         /* NB ecl takes object ref from echo_get_object() above */
1127         spin_lock(&ec->ec_lock);
1128
1129         list_add(&ecl->ecl_exp_chain, &exp->exp_ec_data.eced_locks);
1130         ulh->cookie = ecl->ecl_cookie = ec->ec_unique++;
1131
1132         spin_unlock(&ec->ec_lock);
1133
1134         oa->o_valid |= OBD_MD_FLHANDLE;
1135         return 0;
1136
1137  failed_1:
1138         OBD_FREE (ecl, sizeof (*ecl));
1139  failed_0:
1140         echo_put_object (eco);
1141         return (rc);
1142 }
1143
1144 static int
1145 echo_client_cancel(struct obd_export *exp, struct obdo *oa)
1146 {
1147         struct obd_device      *obd = exp->exp_obd;
1148         struct echo_client_obd *ec = &obd->u.echo_client;
1149         struct lustre_handle   *ulh = obdo_handle (oa);
1150         struct ec_lock         *ecl = NULL;
1151         int                     found = 0;
1152         struct list_head       *el;
1153         int                     rc;
1154
1155         if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
1156                 return -EINVAL;
1157
1158         spin_lock (&ec->ec_lock);
1159
1160         list_for_each (el, &exp->exp_ec_data.eced_locks) {
1161                 ecl = list_entry (el, struct ec_lock, ecl_exp_chain);
1162                 found = (ecl->ecl_cookie == ulh->cookie);
1163                 if (found) {
1164                         list_del (&ecl->ecl_exp_chain);
1165                         break;
1166                 }
1167         }
1168
1169         spin_unlock (&ec->ec_lock);
1170
1171         if (!found)
1172                 return (-ENOENT);
1173
1174         rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm, ecl->ecl_mode,
1175                         &ecl->ecl_lock_handle);
1176
1177         echo_put_object (ecl->ecl_object);
1178         OBD_FREE (ecl, sizeof (*ecl));
1179
1180         return rc;
1181 }
1182
1183 static int
1184 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
1185                       int len, void *karg, void *uarg)
1186 {
1187         struct obd_device      *obd;
1188         struct echo_client_obd *ec;
1189         struct ec_object       *eco;
1190         struct obd_ioctl_data  *data = karg;
1191         struct obd_trans_info   dummy_oti;
1192         struct oti_req_ack_lock *ack_lock;
1193         struct obdo            *oa;
1194         int                     rw = OBD_BRW_READ;
1195         int                     rc = 0;
1196         int                     i;
1197         ENTRY;
1198
1199         unlock_kernel();
1200
1201         memset(&dummy_oti, 0, sizeof(dummy_oti));
1202
1203         obd = exp->exp_obd;
1204         ec = &obd->u.echo_client;
1205
1206         switch (cmd) {
1207         case OBD_IOC_CREATE:                    /* may create echo object */
1208                 if (!capable (CAP_SYS_ADMIN))
1209                         GOTO (out, rc = -EPERM);
1210
1211                 rc = echo_create_object (obd, 1, &data->ioc_obdo1,
1212                                          data->ioc_pbuf1, data->ioc_plen1,
1213                                          &dummy_oti);
1214                 GOTO(out, rc);
1215
1216         case OBD_IOC_DESTROY:
1217                 if (!capable (CAP_SYS_ADMIN))
1218                         GOTO (out, rc = -EPERM);
1219
1220                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
1221                 if (rc == 0) {
1222                         oa = &data->ioc_obdo1;
1223                         oa->o_gr = FILTER_GROUP_ECHO;
1224                         oa->o_valid |= OBD_MD_FLGROUP;
1225                         rc = obd_destroy(ec->ec_exp, oa, eco->eco_lsm,
1226                                          &dummy_oti, NULL);
1227                         if (rc == 0)
1228                                 eco->eco_deleted = 1;
1229                         echo_put_object(eco);
1230                 }
1231                 GOTO(out, rc);
1232
1233         case OBD_IOC_GETATTR:
1234                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
1235                 if (rc == 0) {
1236                         rc = obd_getattr(ec->ec_exp, &data->ioc_obdo1,
1237                                          eco->eco_lsm);
1238                         echo_put_object(eco);
1239                 }
1240                 GOTO(out, rc);
1241
1242         case OBD_IOC_SETATTR:
1243                 if (!capable (CAP_SYS_ADMIN))
1244                         GOTO (out, rc = -EPERM);
1245
1246                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
1247                 if (rc == 0) {
1248                         rc = obd_setattr(ec->ec_exp, &data->ioc_obdo1,
1249                                          eco->eco_lsm, NULL);
1250                         echo_put_object(eco);
1251                 }
1252                 GOTO(out, rc);
1253
1254         case OBD_IOC_BRW_WRITE:
1255                 if (!capable (CAP_SYS_ADMIN))
1256                         GOTO (out, rc = -EPERM);
1257
1258                 rw = OBD_BRW_WRITE;
1259                 /* fall through */
1260         case OBD_IOC_BRW_READ:
1261                 rc = echo_client_brw_ioctl(rw, exp, data);
1262                 GOTO(out, rc);
1263
1264         case ECHO_IOC_GET_STRIPE:
1265                 rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
1266                 if (rc == 0) {
1267                         rc = echo_copyout_lsm(eco->eco_lsm, data->ioc_pbuf1,
1268                                               data->ioc_plen1);
1269                         echo_put_object(eco);
1270                 }
1271                 GOTO(out, rc);
1272
1273         case ECHO_IOC_SET_STRIPE:
1274                 if (!capable (CAP_SYS_ADMIN))
1275                         GOTO (out, rc = -EPERM);
1276
1277                 if (data->ioc_pbuf1 == NULL) {  /* unset */
1278                         rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
1279                         if (rc == 0) {
1280                                 eco->eco_deleted = 1;
1281                                 echo_put_object(eco);
1282                         }
1283                 } else {
1284                         rc = echo_create_object(obd, 0, &data->ioc_obdo1,
1285                                                 data->ioc_pbuf1,
1286                                                 data->ioc_plen1, &dummy_oti);
1287                 }
1288                 GOTO (out, rc);
1289
1290         case ECHO_IOC_ENQUEUE:
1291                 if (!capable (CAP_SYS_ADMIN))
1292                         GOTO (out, rc = -EPERM);
1293
1294                 rc = echo_client_enqueue(exp, &data->ioc_obdo1,
1295                                          data->ioc_conn1, /* lock mode */
1296                                    data->ioc_offset, data->ioc_count);/*extent*/
1297                 GOTO (out, rc);
1298
1299         case ECHO_IOC_CANCEL:
1300                 rc = echo_client_cancel(exp, &data->ioc_obdo1);
1301                 GOTO (out, rc);
1302
1303         default:
1304                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1305                 GOTO (out, rc = -ENOTTY);
1306         }
1307
1308         EXIT;
1309  out:
1310
1311         /* XXX this should be in a helper also called by target_send_reply */
1312         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
1313              i++, ack_lock++) {
1314                 if (!ack_lock->mode)
1315                         break;
1316                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
1317         }
1318
1319         lock_kernel();
1320
1321         return rc;
1322 }
1323
1324 static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
1325 {
1326         struct echo_client_obd *ec = &obddev->u.echo_client;
1327         struct obd_device *tgt;
1328         struct lustre_handle conn = {0, };
1329         struct obd_uuid echo_uuid = { "ECHO_UUID" };
1330         struct obd_connect_data *ocd = NULL;
1331         int rc;
1332         ENTRY;
1333
1334         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1335                 CERROR("requires a TARGET OBD name\n");
1336                 RETURN(-EINVAL);
1337         }
1338
1339         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1340         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1341                 CERROR("device not attached or not set up (%s)\n",
1342                        lustre_cfg_string(lcfg, 1));
1343                 RETURN(-EINVAL);
1344         }
1345
1346         spin_lock_init (&ec->ec_lock);
1347         INIT_LIST_HEAD (&ec->ec_objects);
1348         ec->ec_unique = 0;
1349
1350         OBD_ALLOC(ocd, sizeof(*ocd));
1351         if (ocd == NULL) {
1352                 CERROR("Can't alloc ocd connecting to %s\n",
1353                        lustre_cfg_string(lcfg, 1));
1354                 return -ENOMEM;
1355         }
1356
1357         ocd->ocd_version = LUSTRE_VERSION_CODE;
1358
1359         rc = obd_connect(&conn, tgt, &echo_uuid, ocd);
1360
1361         OBD_FREE(ocd, sizeof(*ocd));
1362
1363         if (rc != 0) {
1364                 CERROR("fail to connect to device %s\n",
1365                        lustre_cfg_string(lcfg, 1));
1366                 return (rc);
1367         }
1368         ec->ec_exp = class_conn2export(&conn);
1369
1370         RETURN(rc);
1371 }
1372
1373 static int echo_client_cleanup(struct obd_device *obddev)
1374 {
1375         struct list_head       *el;
1376         struct ec_object       *eco;
1377         struct echo_client_obd *ec = &obddev->u.echo_client;
1378         int rc;
1379         ENTRY;
1380
1381         if (!list_empty(&obddev->obd_exports)) {
1382                 CERROR("still has clients!\n");
1383                 RETURN(-EBUSY);
1384         }
1385
1386         /* XXX assuming sole access */
1387         while (!list_empty(&ec->ec_objects)) {
1388                 el = ec->ec_objects.next;
1389                 eco = list_entry(el, struct ec_object, eco_obj_chain);
1390
1391                 LASSERT(eco->eco_refcount == 0);
1392                 eco->eco_refcount = 1;
1393                 eco->eco_deleted = 1;
1394                 echo_put_object(eco);
1395         }
1396
1397         rc = obd_disconnect(ec->ec_exp);
1398         if (rc != 0)
1399                 CERROR("fail to disconnect device: %d\n", rc);
1400
1401         RETURN(rc);
1402 }
1403
1404 static int echo_client_connect(struct lustre_handle *conn,
1405                                struct obd_device *src, struct obd_uuid *cluuid,
1406                                struct obd_connect_data *data)
1407 {
1408         struct obd_export *exp;
1409         int                rc;
1410
1411         rc = class_connect(conn, src, cluuid);
1412         if (rc == 0) {
1413                 exp = class_conn2export(conn);
1414                 INIT_LIST_HEAD(&exp->exp_ec_data.eced_locks);
1415                 class_export_put(exp);
1416         }
1417
1418         RETURN (rc);
1419 }
1420
1421 static int echo_client_disconnect(struct obd_export *exp)
1422 {
1423         struct obd_device      *obd;
1424         struct echo_client_obd *ec;
1425         struct ec_lock         *ecl;
1426         int                     rc;
1427         ENTRY;
1428
1429         if (exp == NULL)
1430                 GOTO(out, rc = -EINVAL);
1431
1432         obd = exp->exp_obd;
1433         ec = &obd->u.echo_client;
1434
1435         /* no more contention on export's lock list */
1436         while (!list_empty (&exp->exp_ec_data.eced_locks)) {
1437                 ecl = list_entry (exp->exp_ec_data.eced_locks.next,
1438                                   struct ec_lock, ecl_exp_chain);
1439                 list_del (&ecl->ecl_exp_chain);
1440
1441                 rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
1442                                  ecl->ecl_mode, &ecl->ecl_lock_handle);
1443
1444                 CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
1445                         "(%d)\n", ecl->ecl_object->eco_id, rc);
1446
1447                 echo_put_object (ecl->ecl_object);
1448                 OBD_FREE (ecl, sizeof (*ecl));
1449         }
1450
1451         rc = class_disconnect(exp);
1452         GOTO(out, rc);
1453  out:
1454         return rc;
1455 }
1456
1457 static struct obd_ops echo_obd_ops = {
1458         .o_owner       = THIS_MODULE,
1459         .o_setup       = echo_client_setup,
1460         .o_cleanup     = echo_client_cleanup,
1461         .o_iocontrol   = echo_client_iocontrol,
1462         .o_connect     = echo_client_connect,
1463         .o_disconnect  = echo_client_disconnect
1464 };
1465
1466 int echo_client_init(void)
1467 {
1468         struct lprocfs_static_vars lvars;
1469
1470         lprocfs_init_vars(echo, &lvars);
1471         return class_register_type(&echo_obd_ops, lvars.module_vars,
1472                                    OBD_ECHO_CLIENT_DEVICENAME, NULL);
1473 }
1474
1475 void echo_client_exit(void)
1476 {
1477         class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME);
1478 }