Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_ECHO
23 #ifdef __KERNEL__
24 #include <linux/version.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
28 #include <linux/iobuf.h>
29 #endif
30 #include <asm/div64.h>
31 #else
32 #include <liblustre.h>
33 #endif
34
35 #include <linux/obd.h>
36 #include <linux/obd_support.h>
37 #include <linux/obd_class.h>
38 #include <linux/obd_echo.h>
39 #include <linux/lustre_debug.h>
40 #include <linux/lprocfs_status.h>
41 #include <linux/lustre_lite.h>                  /* for LL_IOC_LOV_SETSTRIPE */
42
43 #if 0
44 static void
45 echo_printk_object (char *msg, struct ec_object *eco)
46 {
47         struct lov_stripe_md *lsm = eco->eco_lsm;
48         int                   i;
49
50         printk (KERN_INFO "%s: object %p: "LPX64", refs %d%s: "LPX64
51                 "=%u!%u@%d\n", msg, eco, eco->eco_id, eco->eco_refcount,
52                 eco->eco_deleted ? "(deleted) " : "",
53                 lsm->lsm_object_id, lsm->lsm_stripe_size,
54                 lsm->lsm_stripe_count, lsm->lsm_stripe_offset);
55
56         for (i = 0; i < lsm->lsm_stripe_count; i++)
57                 printk (KERN_INFO "   [%2u]"LPX64"\n",
58                         lsm->lsm_oinfo[i].loi_ost_idx,
59                         lsm->lsm_oinfo[i].loi_id);
60 }
61 #endif
62
63 static struct ec_object *
64 echo_find_object_locked (struct obd_device *obd, obd_id id)
65 {
66         struct echo_client_obd *ec = &obd->u.echo_client;
67         struct ec_object       *eco = NULL;
68         struct list_head       *el;
69
70         list_for_each (el, &ec->ec_objects) {
71                 eco = list_entry (el, struct ec_object, eco_obj_chain);
72
73                 if (eco->eco_id == id)
74                         return (eco);
75         }
76         return (NULL);
77 }
78
79 static int
80 echo_copyout_lsm (struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob)
81 {
82         int nob;
83
84         nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
85         if (nob > ulsm_nob)
86                 return (-EINVAL);
87
88         if (copy_to_user (ulsm, lsm, nob))
89                 return (-EFAULT);
90
91         return (0);
92 }
93
94 static int
95 echo_copyin_lsm (struct obd_device *obd, struct lov_stripe_md *lsm,
96                  void *ulsm, int ulsm_nob)
97 {
98         struct echo_client_obd *ec = &obd->u.echo_client;
99         int                     nob;
100
101         if (ulsm_nob < sizeof (*lsm))
102                 return (-EINVAL);
103
104         if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
105                 return (-EFAULT);
106
107         nob = lsm->lsm_stripe_count * sizeof (lsm->lsm_oinfo[0]);
108
109         if (ulsm_nob < nob ||
110             lsm->lsm_stripe_count > ec->ec_nstripes ||
111             lsm->lsm_magic != LOV_MAGIC ||
112             (lsm->lsm_stripe_offset != 0 &&
113              lsm->lsm_stripe_offset != 0xffffffff &&
114              lsm->lsm_stripe_offset >= ec->ec_nstripes) ||
115             (lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 ||
116             ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
117                 return (-EINVAL);
118
119         LASSERT (ec->ec_lsmsize >= sizeof (*lsm) + nob);
120
121         if (copy_from_user(lsm->lsm_oinfo,
122                            ((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob))
123                 return (-EFAULT);
124
125         return (0);
126 }
127
128 static struct ec_object *
129 echo_allocate_object (struct obd_device *obd)
130 {
131         struct echo_client_obd *ec = &obd->u.echo_client;
132         struct ec_object       *eco;
133
134         OBD_ALLOC (eco, sizeof (*eco));
135         if (eco == NULL)
136                 return (NULL);
137
138         OBD_ALLOC (eco->eco_lsm, ec->ec_lsmsize);
139         if (eco->eco_lsm == NULL) {
140                 OBD_FREE (eco, sizeof (*eco));
141                 return (NULL);
142         }
143
144         eco->eco_device = obd;
145         eco->eco_deleted = 0;
146         eco->eco_refcount = 0;
147         eco->eco_lsm->lsm_magic = LOV_MAGIC;
148         /* leave stripe count 0 by default */
149
150         return (eco);
151 }
152
153 static void
154 echo_free_object (struct ec_object *eco)
155 {
156         struct obd_device      *obd = eco->eco_device;
157         struct echo_client_obd *ec = &obd->u.echo_client;
158
159         LASSERT (eco->eco_refcount == 0);
160         OBD_FREE (eco->eco_lsm, ec->ec_lsmsize);
161         OBD_FREE (eco, sizeof (*eco));
162 }
163
164 static int
165 echo_create_object (struct obd_device *obd, int on_target, struct obdo *oa,
166                     void *ulsm, int ulsm_nob)
167 {
168         struct echo_client_obd *ec = &obd->u.echo_client;
169         struct ec_object       *eco2;
170         struct ec_object       *eco;
171         struct lov_stripe_md   *lsm;
172         int                     rc;
173         int                     i;
174
175         if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
176             (on_target ||                       /* set_stripe */
177              ec->ec_nstripes != 0)) {           /* LOV */
178                 CERROR ("No valid oid\n");
179                 return (-EINVAL);
180         }
181
182         eco = echo_allocate_object (obd);
183         if (eco == NULL)
184                 return (-ENOMEM);
185
186         lsm = eco->eco_lsm;
187
188         if (ulsm != NULL) {
189                 rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob);
190                 if (rc != 0)
191                         goto failed;
192         }
193
194         /* setup object ID here for !on_target and LOV hint */
195         if ((oa->o_valid & OBD_MD_FLID) != 0)
196                 eco->eco_id = lsm->lsm_object_id = oa->o_id;
197
198         /* defaults -> actual values */
199         if (lsm->lsm_stripe_offset == 0xffffffff)
200                 lsm->lsm_stripe_offset = 0;
201
202         if (lsm->lsm_stripe_count == 0)
203                 lsm->lsm_stripe_count = ec->ec_nstripes;
204
205         if (lsm->lsm_stripe_size == 0)
206                 lsm->lsm_stripe_size = PAGE_SIZE;
207
208         /* setup stripes: indices + default ids if required */
209         for (i = 0; i < lsm->lsm_stripe_count; i++) {
210                 if (lsm->lsm_oinfo[i].loi_id == 0)
211                         lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
212
213                 lsm->lsm_oinfo[i].loi_ost_idx =
214                         (lsm->lsm_stripe_offset + i) % ec->ec_nstripes;
215         }
216
217         if (on_target) {
218                 rc = obd_create (&ec->ec_conn, oa, &lsm, NULL);
219                 if (rc != 0)
220                         goto failed;
221
222                 /* See what object ID we were given */
223                 LASSERT ((oa->o_valid & OBD_MD_FLID) != 0);
224                 eco->eco_id = lsm->lsm_object_id = oa->o_id;
225         }
226
227         spin_lock (&ec->ec_lock);
228
229         eco2 = echo_find_object_locked (obd, oa->o_id);
230         if (eco2 != NULL) {                     /* conflict */
231                 spin_unlock (&ec->ec_lock);
232
233                 CERROR ("Can't create object id "LPX64": id already exists%s\n",
234                         oa->o_id, on_target ? " (undoing create)" : "");
235
236                 if (on_target)
237                         obd_destroy (&ec->ec_conn, oa, lsm, NULL);
238
239                 rc = -EEXIST;
240                 goto failed;
241         }
242
243         list_add (&eco->eco_obj_chain, &ec->ec_objects);
244         spin_unlock (&ec->ec_lock);
245         CDEBUG (D_INFO,
246                 "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
247                 eco, eco->eco_id,
248                 eco->eco_lsm->lsm_stripe_size,
249                 eco->eco_lsm->lsm_stripe_count,
250                 eco->eco_lsm->lsm_stripe_offset,
251                 eco->eco_refcount, eco->eco_deleted);
252         return (0);
253
254  failed:
255         echo_free_object (eco);
256         return (rc);
257 }
258
259 static int
260 echo_get_object (struct ec_object **ecop, struct obd_device *obd,
261                  struct obdo *oa)
262 {
263         struct echo_client_obd *ec = &obd->u.echo_client;
264         struct ec_object       *eco;
265         struct ec_object       *eco2;
266         int                     rc;
267
268         if ((oa->o_valid & OBD_MD_FLID) == 0)
269         {
270                 CERROR ("No valid oid\n");
271                 return (-EINVAL);
272         }
273
274         spin_lock (&ec->ec_lock);
275         eco = echo_find_object_locked (obd, oa->o_id);
276         if (eco != NULL) {
277                 if (eco->eco_deleted)           /* being deleted */
278                         return (-EAGAIN);       /* (see comment in cleanup) */
279
280                 eco->eco_refcount++;
281                 spin_unlock (&ec->ec_lock);
282                 *ecop = eco;
283                 CDEBUG (D_INFO,
284                         "found %p: "LPX64"=%u#%u&%d refs %d del %d\n",
285                         eco, eco->eco_id,
286                         eco->eco_lsm->lsm_stripe_size,
287                         eco->eco_lsm->lsm_stripe_count,
288                         eco->eco_lsm->lsm_stripe_offset,
289                         eco->eco_refcount, eco->eco_deleted);
290                 return (0);
291         }
292         spin_unlock (&ec->ec_lock);
293
294         if (ec->ec_nstripes != 0)               /* striping required */
295                 return (-ENOENT);
296
297         eco = echo_allocate_object (obd);
298         if (eco == NULL)
299                 return (-ENOMEM);
300
301         eco->eco_id = eco->eco_lsm->lsm_object_id = oa->o_id;
302
303         spin_lock (&ec->ec_lock);
304
305         eco2 = echo_find_object_locked (obd, oa->o_id);
306         if (eco2 == NULL) {                     /* didn't race */
307                 list_add (&eco->eco_obj_chain, &ec->ec_objects);
308                 spin_unlock (&ec->ec_lock);
309                 eco->eco_refcount = 1;
310                 *ecop = eco;
311                 CDEBUG (D_INFO,
312                         "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
313                         eco, eco->eco_id,
314                         eco->eco_lsm->lsm_stripe_size,
315                         eco->eco_lsm->lsm_stripe_count,
316                         eco->eco_lsm->lsm_stripe_offset,
317                         eco->eco_refcount, eco->eco_deleted);
318                 return (0);
319         }
320
321         if (eco2->eco_deleted)
322                 rc = -EAGAIN;                   /* lose race */
323         else {
324                 eco2->eco_refcount++;           /* take existing */
325                 *ecop = eco2;
326                 rc = 0;
327                 LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id);
328                 CDEBUG (D_INFO,
329                         "found(2) %p: "LPX64"=%u#%u&%d refs %d del %d\n",
330                         eco2, eco2->eco_id,
331                         eco2->eco_lsm->lsm_stripe_size,
332                         eco2->eco_lsm->lsm_stripe_count,
333                         eco2->eco_lsm->lsm_stripe_offset,
334                         eco2->eco_refcount, eco2->eco_deleted);
335         }
336
337         spin_unlock (&ec->ec_lock);
338
339         echo_free_object (eco);
340         return (rc);
341 }
342
343 static void
344 echo_put_object (struct ec_object *eco)
345 {
346         struct obd_device      *obd = eco->eco_device;
347         struct echo_client_obd *ec = &obd->u.echo_client;
348
349         /* Release caller's ref on the object.
350          * delete => mark for deletion when last ref goes
351          */
352
353         spin_lock (&ec->ec_lock);
354
355         eco->eco_refcount--;
356         LASSERT (eco->eco_refcount >= 0);
357
358         if (eco->eco_refcount != 0 ||
359             !eco->eco_deleted) {
360                 spin_unlock (&ec->ec_lock);
361                 return;
362         }
363
364         spin_unlock (&ec->ec_lock);
365
366         /* NB leave obj in the object list.  We must prevent anyone from
367          * attempting to enqueue on this object number until we can be
368          * sure there will be no more lock callbacks.
369          */
370         obd_cancel_unused (&ec->ec_conn, eco->eco_lsm, 0);
371
372         /* now we can let it go */
373         spin_lock (&ec->ec_lock);
374         list_del (&eco->eco_obj_chain);
375         spin_unlock (&ec->ec_lock);
376
377         LASSERT (eco->eco_refcount == 0);
378
379         echo_free_object (eco);
380 }
381
382 static void
383 echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
384 {
385         unsigned long stripe_count;
386         unsigned long stripe_size;
387         unsigned long width;
388         unsigned long woffset;
389         int           stripe_index;
390         obd_off       offset;
391
392         if (lsm->lsm_stripe_count <= 1)
393                 return;
394
395         offset       = *offp;
396         stripe_size  = lsm->lsm_stripe_size;
397         stripe_count = lsm->lsm_stripe_count;
398
399         /* width = # bytes in all stripes */
400         width = stripe_size * stripe_count;
401
402         /* woffset = offset within a width; offset = whole number of widths */
403         woffset = do_div (offset, width);
404
405         stripe_index = woffset / stripe_size;
406
407         *idp = lsm->lsm_oinfo[stripe_index].loi_id;
408         *offp = offset * stripe_size + woffset % stripe_size;
409 }
410
411 static int
412 echo_client_kbrw (struct obd_device *obd, int rw,
413                   struct obdo *oa, struct lov_stripe_md *lsm,
414                   obd_off offset, obd_size count)
415 {
416         struct echo_client_obd *ec = &obd->u.echo_client;
417         struct obd_brw_set     *set;
418         obd_count               npages;
419         struct brw_page        *pga;
420         struct brw_page        *pgp;
421         obd_off                 off;
422         int                     i;
423         int                     rc;
424         int                     verify;
425         int                     gfp_mask;
426
427         /* oa_id  == 0    => speed test (no verification) else...
428          * oa & 1         => use HIGHMEM
429          */
430         verify = (oa->o_id != 0);
431         gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
432
433         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
434
435         if (count <= 0 ||
436             (count & (PAGE_SIZE - 1)) != 0 ||
437             (lsm != NULL &&
438              lsm->lsm_object_id != oa->o_id))
439                 return (-EINVAL);
440
441         set = obd_brw_set_new();
442         if (set == NULL)
443                 return (-ENOMEM);
444
445         /* XXX think again with misaligned I/O */
446         npages = count >> PAGE_SHIFT;
447
448         rc = -ENOMEM;
449         OBD_ALLOC(pga, npages * sizeof(*pga));
450         if (pga == NULL)
451                 goto out_0;
452
453         for (i = 0, pgp = pga, off = offset;
454              i < npages;
455              i++, pgp++, off += PAGE_SIZE) {
456
457                 LASSERT (pgp->pg == NULL);      /* for cleanup */
458
459                 rc = -ENOMEM;
460                 pgp->pg = alloc_pages (gfp_mask, 0);
461                 if (pgp->pg == NULL)
462                         goto out_1;
463
464                 pgp->count = PAGE_SIZE;
465                 pgp->off = off;
466                 pgp->flag = 0;
467
468                 if (verify) {
469                         void *addr = kmap(pgp->pg);
470                         obd_off      stripe_off = off;
471                         obd_id       stripe_id = oa->o_id;
472
473                         if (rw == OBD_BRW_WRITE) {
474                                 echo_get_stripe_off_id(lsm, &stripe_off,
475                                                        &stripe_id);
476                                 page_debug_setup(addr, pgp->count,
477                                                  stripe_off, stripe_id);
478                         } else {
479                                 page_debug_setup(addr, pgp->count,
480                                                  0xdeadbeef00c0ffee,
481                                                  0xdeadbeef00c0ffee);
482                         }
483                         kunmap(pgp->pg);
484                 }
485         }
486
487         set->brw_callback = ll_brw_sync_wait;
488         rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, set, NULL);
489         if (rc == 0)
490                 rc = ll_brw_sync_wait(set, CB_PHASE_START);
491
492  out_1:
493         if (rc != 0)
494                 verify = 0;
495
496         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
497                 if (pgp->pg == NULL)
498                         continue;
499
500                 if (verify) {
501                         void    *addr = kmap(pgp->pg);
502                         obd_off  stripe_off = pgp->off;
503                         obd_id   stripe_id  = oa->o_id;
504                         int      vrc;
505
506                         echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
507                         vrc = page_debug_check("test_brw", addr, pgp->count,
508                                                stripe_off, stripe_id);
509                         if (vrc != 0 && rc == 0)
510                                 rc = vrc;
511
512                         kunmap(pgp->pg);
513                 }
514                 __free_pages(pgp->pg, 0);
515         }
516         OBD_FREE(pga, npages * sizeof(*pga));
517  out_0:
518         obd_brw_set_decref(set);
519         return (rc);
520 }
521
522 #ifdef __KERNEL__
523 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
524 static int echo_client_ubrw(struct obd_device *obd, int rw,
525                             struct obdo *oa, struct lov_stripe_md *lsm,
526                             obd_off offset, obd_size count, char *buffer)
527 {
528         struct echo_client_obd *ec = &obd->u.echo_client;
529         struct obd_brw_set     *set;
530         obd_count               npages;
531         struct brw_page        *pga;
532         struct brw_page        *pgp;
533         obd_off                 off;
534         struct kiobuf          *kiobuf;
535         int                     i;
536         int                     rc;
537
538         LASSERT (rw == OBD_BRW_WRITE ||
539                  rw == OBD_BRW_READ);
540
541         /* NB: for now, only whole pages, page aligned */
542
543         if (count <= 0 ||
544             ((long)buffer & (PAGE_SIZE - 1)) != 0 ||
545             (count & (PAGE_SIZE - 1)) != 0 ||
546             (lsm != NULL && lsm->lsm_object_id != oa->o_id))
547                 return (-EINVAL);
548
549         set = obd_brw_set_new();
550         if (set == NULL)
551                 return (-ENOMEM);
552
553         /* XXX think again with misaligned I/O */
554         npages = count >> PAGE_SHIFT;
555
556         rc = -ENOMEM;
557         OBD_ALLOC(pga, npages * sizeof(*pga));
558         if (pga == NULL)
559                 goto out_0;
560
561         rc = alloc_kiovec (1, &kiobuf);
562         if (rc != 0)
563                 goto out_1;
564
565         rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE,
566                               kiobuf, (unsigned long)buffer, count);
567         if (rc != 0)
568                 goto out_2;
569
570         LASSERT (kiobuf->offset == 0);
571         LASSERT (kiobuf->nr_pages == npages);
572
573         for (i = 0, off = offset, pgp = pga;
574              i < npages;
575              i++, off += PAGE_SIZE, pgp++) {
576                 pgp->off = off;
577                 pgp->pg = kiobuf->maplist[i];
578                 pgp->count = PAGE_SIZE;
579                 pgp->flag = 0;
580         }
581
582         set->brw_callback = ll_brw_sync_wait;
583         rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, set, NULL);
584
585         if (rc == 0)
586                 rc = ll_brw_sync_wait(set, CB_PHASE_START);
587
588         //        if (rw == OBD_BRW_READ)
589         //                mark_dirty_kiobuf (kiobuf, count);
590
591         unmap_kiobuf (kiobuf);
592  out_2:
593         free_kiovec (1, &kiobuf);
594  out_1:
595         OBD_FREE(pga, npages * sizeof(*pga));
596  out_0:
597         obd_brw_set_decref(set);
598         return (rc);
599 }
600 #else
601 static int echo_client_ubrw(struct obd_device *obd, int rw,
602                             struct obdo *oa, struct lov_stripe_md *lsm,
603                             obd_off offset, obd_size count, char *buffer)
604 {
605         LBUG();
606         return 0;
607 }
608 #endif
609 #endif
610
611 static int
612 echo_open (struct obd_export *exp, struct obdo *oa)
613 {
614         struct obd_device      *obd = exp->exp_obd;
615         struct echo_client_obd *ec = &obd->u.echo_client;
616         struct lustre_handle   *ufh = obdo_handle (oa);
617         struct ec_open_object  *ecoo;
618         struct ec_object       *eco;
619         int                     rc;
620
621         rc = echo_get_object (&eco, obd, oa);
622         if (rc != 0)
623                 return (rc);
624
625         rc = -ENOMEM;
626         OBD_ALLOC (ecoo, sizeof (*ecoo));
627         if (ecoo == NULL)
628                 goto failed_0;
629
630         rc = obd_open (&ec->ec_conn, oa, eco->eco_lsm, NULL);
631         if (rc != 0)
632                 goto failed_1;
633
634         memcpy (&ecoo->ecoo_oa, oa, sizeof (*oa));
635         ecoo->ecoo_object = eco;
636         /* ecoo takes ref from echo_get_object() above */
637
638         spin_lock (&ec->ec_lock);
639
640         list_add (&ecoo->ecoo_exp_chain, &exp->exp_ec_data.eced_open_head);
641
642         ufh->addr = (__u64)((long) ecoo);
643         ufh->cookie = ecoo->ecoo_cookie = ec->ec_unique++;
644
645         spin_unlock (&ec->ec_lock);
646         return (0);
647
648  failed_1:
649         OBD_FREE (ecoo, sizeof (*ecoo));
650  failed_0:
651         echo_put_object (eco);
652         return (rc);
653 }
654
655 static int
656 echo_close (struct obd_export *exp, struct obdo *oa)
657 {
658         struct obd_device      *obd = exp->exp_obd;
659         struct echo_client_obd *ec = &obd->u.echo_client;
660         struct lustre_handle   *ufh = obdo_handle (oa);
661         struct ec_open_object  *ecoo = NULL;
662         int                     found = 0;
663         struct list_head       *el;
664         int                     rc;
665
666         if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
667                 return (-EINVAL);
668
669         spin_lock (&ec->ec_lock);
670
671         list_for_each (el, &exp->exp_ec_data.eced_open_head) {
672                 ecoo = list_entry (el, struct ec_open_object, ecoo_exp_chain);
673                 if ((__u64)((long)ecoo) == ufh->addr) {
674                         found = (ecoo->ecoo_cookie == ufh->cookie);
675                         if (found)
676                                 list_del (&ecoo->ecoo_exp_chain);
677                         break;
678                 }
679         }
680
681         spin_unlock (&ec->ec_lock);
682
683         if (!found)
684                 return (-EINVAL);
685
686         rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
687                         ecoo->ecoo_object->eco_lsm, NULL);
688
689         echo_put_object (ecoo->ecoo_object);
690         OBD_FREE (ecoo, sizeof (*ecoo));
691
692         return (rc);
693 }
694
695 static int
696 echo_ldlm_callback (struct ldlm_lock *lock, struct ldlm_lock_desc *new,
697                     void *data, int flag)
698 {
699         struct ec_object       *eco = (struct ec_object *)data;
700         struct echo_client_obd *ec = &(eco->eco_device->u.echo_client);
701         struct lustre_handle    lockh;
702         struct list_head       *el;
703         int                     found = 0;
704         int                     rc;
705
706         ldlm_lock2handle (lock, &lockh);
707
708         /* #ifdef this out if we're not feeling paranoid */
709         spin_lock (&ec->ec_lock);
710         list_for_each (el, &ec->ec_objects) {
711                 found = (eco == list_entry(el, struct ec_object,
712                                            eco_obj_chain));
713                 if (found)
714                         break;
715         }
716         spin_unlock (&ec->ec_lock);
717         LASSERT (found);
718
719         switch (flag) {
720         case LDLM_CB_BLOCKING:
721                 CDEBUG (D_INFO, "blocking callback on "LPX64", handle "LPX64"."
722                         LPX64"\n", eco->eco_id, lockh.addr, lockh.cookie);
723                 rc = ldlm_cli_cancel (&lockh);
724                 if (rc != ELDLM_OK)
725                         CERROR ("ldlm_cli_cancel failed: %d\n", rc);
726                 break;
727
728         case LDLM_CB_CANCELING:
729                 CDEBUG (D_INFO, "canceling callback on "LPX64", handle "LPX64"."
730                         LPX64"\n", eco->eco_id, lockh.addr, lockh.cookie);
731                 break;
732
733         default:
734                 LBUG ();
735         }
736
737         return (0);
738 }
739
740 static int
741 echo_enqueue (struct obd_export *exp, struct obdo *oa,
742               int mode, obd_off offset, obd_size nob)
743 {
744         struct obd_device      *obd = exp->exp_obd;
745         struct echo_client_obd *ec = &obd->u.echo_client;
746         struct lustre_handle   *ulh = obdo_handle (oa);
747         struct ec_object       *eco;
748         struct ec_lock         *ecl;
749         int                     flags;
750         int                     rc;
751
752         if (!(mode == LCK_PR || mode == LCK_PW))
753                 return (-EINVAL);
754
755         if ((offset & (PAGE_SIZE - 1)) != 0 ||
756             (nob & (PAGE_SIZE - 1)) != 0)
757                 return (-EINVAL);
758
759         rc = echo_get_object (&eco, obd, oa);
760         if (rc != 0)
761                 return (rc);
762
763         rc = -ENOMEM;
764         OBD_ALLOC (ecl, sizeof (*ecl));
765         if (ecl == NULL)
766                 goto failed_0;
767
768         ecl->ecl_mode = mode;
769         ecl->ecl_object = eco;
770         ecl->ecl_extent.start = offset;
771         ecl->ecl_extent.end = (nob == 0) ? ((obd_off)-1) : (offset + nob - 1);
772
773         flags = 0;
774         rc = obd_enqueue (&ec->ec_conn, eco->eco_lsm, NULL, LDLM_EXTENT,
775                           &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
776                           &flags, echo_ldlm_callback, eco, sizeof (*eco),
777                           &ecl->ecl_handle);
778         if (rc != 0)
779                 goto failed_1;
780
781         CDEBUG (D_INFO, "enqueue handle "LPX64"."LPX64"\n",
782                 ecl->ecl_handle.addr, ecl->ecl_handle.cookie);
783
784         /* NB ecl takes object ref from echo_get_object() above */
785
786         spin_lock (&ec->ec_lock);
787
788         list_add (&ecl->ecl_exp_chain, &exp->exp_ec_data.eced_locks);
789
790         ulh->addr = (__u64)((long)ecl);
791         ulh->cookie = ecl->ecl_cookie = ec->ec_unique++;
792
793         spin_unlock (&ec->ec_lock);
794
795         oa->o_valid |= OBD_MD_FLHANDLE;
796         return (0);
797
798  failed_1:
799         OBD_FREE (ecl, sizeof (*ecl));
800  failed_0:
801         echo_put_object (eco);
802         return (rc);
803 }
804
805 static int
806 echo_cancel (struct obd_export *exp, struct obdo *oa)
807 {
808         struct obd_device      *obd = exp->exp_obd;
809         struct echo_client_obd *ec = &obd->u.echo_client;
810         struct lustre_handle   *ulh = obdo_handle (oa);
811         struct ec_lock         *ecl = NULL;
812         int                     found = 0;
813         struct list_head       *el;
814         int                     rc;
815
816         if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
817                 return (-EINVAL);
818
819         spin_lock (&ec->ec_lock);
820
821         list_for_each (el, &exp->exp_ec_data.eced_locks) {
822                 ecl = list_entry (el, struct ec_lock, ecl_exp_chain);
823
824                 if ((__u64)((long)ecl) == ulh->addr) {
825                         found = (ecl->ecl_cookie == ulh->cookie);
826                         if (found)
827                                 list_del (&ecl->ecl_exp_chain);
828                         break;
829                 }
830         }
831
832         spin_unlock (&ec->ec_lock);
833
834         if (!found)
835                 return (-ENOENT);
836
837         rc = obd_cancel (&ec->ec_conn,
838                          ecl->ecl_object->eco_lsm,
839                          ecl->ecl_mode,
840                          &ecl->ecl_handle);
841
842         echo_put_object (ecl->ecl_object);
843         OBD_FREE (ecl, sizeof (*ecl));
844
845         return (rc);
846 }
847
848 static int echo_iocontrol(unsigned int cmd, struct lustre_handle *obdconn,
849                           int len, void *karg, void *uarg)
850 {
851         struct obd_export      *exp = class_conn2export (obdconn);
852         struct obd_device      *obd;
853         struct echo_client_obd *ec;
854         struct ec_object       *eco;
855         struct obd_ioctl_data  *data = karg;
856         int                     rw = OBD_BRW_READ;
857         int                     rc = 0;
858         ENTRY;
859
860         if (exp == NULL) {
861                 CERROR("ioctl: No device\n");
862                 GOTO(out, rc = -EINVAL);
863         }
864
865         obd = exp->exp_obd;
866         ec = &obd->u.echo_client;
867
868         switch (cmd) {
869         case OBD_IOC_CREATE:                    /* may create echo object */
870                 if (!capable (CAP_SYS_ADMIN))
871                         GOTO (out, rc = -EPERM);
872
873                 rc = echo_create_object (obd, 1, &data->ioc_obdo1,
874                                          data->ioc_pbuf1, data->ioc_plen1);
875                 GOTO(out, rc);
876
877         case OBD_IOC_DESTROY:
878                 if (!capable (CAP_SYS_ADMIN))
879                         GOTO (out, rc = -EPERM);
880
881                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
882                 if (rc == 0) {
883                         rc = obd_destroy(&ec->ec_conn, &data->ioc_obdo1,
884                                          eco->eco_lsm, NULL);
885                         if (rc == 0)
886                                 eco->eco_deleted = 1;
887                         echo_put_object(eco);
888                 }
889                 GOTO(out, rc);
890
891         case OBD_IOC_GETATTR:
892                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
893                 if (rc == 0) {
894                         rc = obd_getattr(&ec->ec_conn, &data->ioc_obdo1,
895                                          eco->eco_lsm);
896                         echo_put_object(eco);
897                 }
898                 GOTO(out, rc);
899
900         case OBD_IOC_SETATTR:
901                 if (!capable (CAP_SYS_ADMIN))
902                         GOTO (out, rc = -EPERM);
903
904                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
905                 if (rc == 0) {
906                         rc = obd_setattr(&ec->ec_conn, &data->ioc_obdo1,
907                                          eco->eco_lsm, NULL);
908                         echo_put_object(eco);
909                 }
910                 GOTO(out, rc);
911
912         case OBD_IOC_OPEN:
913                 rc = echo_open (exp, &data->ioc_obdo1);
914                 GOTO(out, rc);
915
916         case OBD_IOC_CLOSE:
917                 rc = echo_close (exp, &data->ioc_obdo1);
918                 GOTO(out, rc);
919
920         case OBD_IOC_BRW_WRITE:
921                 if (!capable (CAP_SYS_ADMIN))
922                         GOTO (out, rc = -EPERM);
923
924                 rw = OBD_BRW_WRITE;
925                 /* fall through */
926         case OBD_IOC_BRW_READ:
927                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
928                 if (rc == 0) {
929                         if (data->ioc_pbuf2 == NULL) // NULL user data pointer
930                                 rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
931                                                       eco->eco_lsm,
932                                                       data->ioc_offset,
933                                                       data->ioc_count);
934                         else
935 #ifdef __KERNEL__
936                                 rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
937                                                       eco->eco_lsm,
938                                                       data->ioc_offset,
939                                                       data->ioc_count,
940                                                       data->ioc_pbuf2);
941 #endif
942                         echo_put_object(eco);
943                 }
944                 GOTO(out, rc);
945
946         case ECHO_IOC_GET_STRIPE:
947                 rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
948                 if (rc == 0) {
949                         rc = echo_copyout_lsm(eco->eco_lsm, data->ioc_pbuf1,
950                                               data->ioc_plen1);
951                         echo_put_object(eco);
952                 }
953                 GOTO(out, rc);
954
955         case ECHO_IOC_SET_STRIPE:
956                 if (!capable (CAP_SYS_ADMIN))
957                         GOTO (out, rc = -EPERM);
958
959                 if (data->ioc_pbuf1 == NULL) {  /* unset */
960                         rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
961                         if (rc == 0) {
962                                 eco->eco_deleted = 1;
963                                 echo_put_object(eco);
964                         }
965                 } else {
966                         rc = echo_create_object(obd, 0, &data->ioc_obdo1,
967                                                 data->ioc_pbuf1,
968                                                 data->ioc_plen1);
969                 }
970                 GOTO (out, rc);
971
972         case ECHO_IOC_ENQUEUE:
973                 if (!capable (CAP_SYS_ADMIN))
974                         GOTO (out, rc = -EPERM);
975
976                 rc = echo_enqueue (exp, &data->ioc_obdo1,
977                                    data->ioc_conn1, /* lock mode */
978                                    data->ioc_offset, data->ioc_count);/*extent*/
979                 GOTO (out, rc);
980
981         case ECHO_IOC_CANCEL:
982                 rc = echo_cancel (exp, &data->ioc_obdo1);
983                 GOTO (out, rc);
984
985         default:
986                 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
987                 GOTO (out, rc = -ENOTTY);
988         }
989
990  out:
991         RETURN(rc);
992 }
993
994 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
995 {
996         struct obd_ioctl_data* data = buf;
997         struct echo_client_obd *ec = &obddev->u.echo_client;
998         struct obd_device *tgt;
999         struct obd_uuid uuid;
1000         struct lov_stripe_md *lsm = NULL;
1001         struct obd_uuid echo_uuid = { "ECHO_UUID" };
1002         int rc;
1003         ENTRY;
1004
1005         if (data->ioc_inllen1 < 1) {
1006                 CERROR("requires a TARGET OBD UUID\n");
1007                 RETURN(-EINVAL);
1008         }
1009         if (data->ioc_inllen1 > 37) {
1010                 CERROR("OBD UUID must be less than 38 characters\n");
1011                 RETURN(-EINVAL);
1012         }
1013
1014         obd_str2uuid(&uuid, data->ioc_inlbuf1);
1015         tgt = class_uuid2obd(&uuid);
1016         if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
1017             !(tgt->obd_flags & OBD_SET_UP)) {
1018                 CERROR("device not attached or not set up (%d)\n",
1019                        data->ioc_dev);
1020                 RETURN(rc = -EINVAL);
1021         }
1022
1023         spin_lock_init (&ec->ec_lock);
1024         INIT_LIST_HEAD (&ec->ec_objects);
1025         ec->ec_unique = 0;
1026
1027         rc = obd_connect(&ec->ec_conn, tgt, &echo_uuid, NULL, NULL);
1028         if (rc) {
1029                 CERROR("fail to connect to device %d\n", data->ioc_dev);
1030                 return (rc);
1031         }
1032
1033         ec->ec_lsmsize = obd_alloc_memmd (&ec->ec_conn, &lsm);
1034         if (ec->ec_lsmsize < 0) {
1035                 CERROR ("Can't get # stripes: %d\n", rc);
1036                 obd_disconnect (&ec->ec_conn);
1037                 rc = ec->ec_lsmsize;
1038         } else {
1039                 ec->ec_nstripes = lsm->lsm_stripe_count;
1040                 obd_free_memmd (&ec->ec_conn, &lsm);
1041         }
1042
1043         RETURN(rc);
1044 }
1045
1046 static int echo_cleanup(struct obd_device * obddev)
1047 {
1048         struct list_head       *el;
1049         struct ec_object       *eco;
1050         struct echo_client_obd *ec = &obddev->u.echo_client;
1051         int rc;
1052         ENTRY;
1053
1054         if (!list_empty(&obddev->obd_exports)) {
1055                 CERROR("still has clients!\n");
1056                 RETURN(-EBUSY);
1057         }
1058
1059         /* XXX assuming sole access */
1060         while (!list_empty (&ec->ec_objects)) {
1061                 el = ec->ec_objects.next;
1062                 eco = list_entry (el, struct ec_object, eco_obj_chain);
1063
1064                 LASSERT (eco->eco_refcount == 0);
1065                 eco->eco_refcount = 1;
1066                 eco->eco_deleted = 1;
1067                 echo_put_object (eco);
1068         }
1069
1070         rc = obd_disconnect (&ec->ec_conn);
1071         if (rc != 0)
1072                 CERROR("fail to disconnect device: %d\n", rc);
1073
1074         RETURN (rc);
1075 }
1076
1077 static int echo_connect(struct lustre_handle *conn, struct obd_device *src,
1078                         struct obd_uuid *cluuid, struct recovd_obd *recovd,
1079                         ptlrpc_recovery_cb_t recover)
1080 {
1081         struct obd_export *exp;
1082         int                rc;
1083
1084         rc = class_connect(conn, src, cluuid);
1085         if (rc == 0) {
1086                 exp = class_conn2export (conn);
1087                 INIT_LIST_HEAD (&exp->exp_ec_data.eced_open_head);
1088                 INIT_LIST_HEAD (&exp->exp_ec_data.eced_locks);
1089         }
1090
1091         RETURN (rc);
1092 }
1093
1094 static int echo_disconnect(struct lustre_handle *conn)
1095 {
1096         struct obd_export      *exp = class_conn2export (conn);
1097         struct obd_device      *obd;
1098         struct echo_client_obd *ec;
1099         struct ec_open_object  *ecoo;
1100         struct ec_lock         *ecl;
1101         int                     rc;
1102
1103         if (exp == NULL)
1104                 return (-EINVAL);
1105
1106         obd = exp->exp_obd;
1107         ec = &obd->u.echo_client;
1108
1109         /* no more contention on export's lock list */
1110         while (!list_empty (&exp->exp_ec_data.eced_locks)) {
1111                 ecl = list_entry (exp->exp_ec_data.eced_locks.next,
1112                                   struct ec_lock, ecl_exp_chain);
1113                 list_del (&ecl->ecl_exp_chain);
1114
1115                 rc = obd_cancel (&ec->ec_conn, ecl->ecl_object->eco_lsm,
1116                                  ecl->ecl_mode, &ecl->ecl_handle);
1117
1118                 CERROR ("Cancel lock on object "LPX64" on disconnect (%d)\n",
1119                         ecl->ecl_object->eco_id, rc);
1120
1121                 echo_put_object (ecl->ecl_object);
1122                 OBD_FREE (ecl, sizeof (*ecl));
1123         }
1124
1125         /* no more contention on export's open handle list  */
1126         while (!list_empty (&exp->exp_ec_data.eced_open_head)) {
1127                 ecoo = list_entry (exp->exp_ec_data.eced_open_head.next,
1128                                    struct ec_open_object, ecoo_exp_chain);
1129                 list_del (&ecoo->ecoo_exp_chain);
1130
1131                 rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
1132                                 ecoo->ecoo_object->eco_lsm, NULL);
1133
1134                 CDEBUG (D_INFO, "Closed object "LPX64" on disconnect (%d)\n",
1135                         ecoo->ecoo_oa.o_id, rc);
1136
1137                 echo_put_object (ecoo->ecoo_object);
1138                 OBD_FREE (ecoo, sizeof (*ecoo));
1139         }
1140
1141         rc = class_disconnect (conn);
1142         RETURN (rc);
1143 }
1144
1145 static struct obd_ops echo_obd_ops = {
1146         o_owner:       THIS_MODULE,
1147         o_setup:       echo_setup,
1148         o_cleanup:     echo_cleanup,
1149         o_iocontrol:   echo_iocontrol,
1150         o_connect:     echo_connect,
1151         o_disconnect:  echo_disconnect
1152 };
1153
1154 int echo_client_init(void)
1155 {
1156         struct lprocfs_static_vars lvars;
1157
1158         lprocfs_init_vars(&lvars);
1159         return class_register_type(&echo_obd_ops, lvars.module_vars,
1160                                    OBD_ECHO_CLIENT_DEVICENAME);
1161 }
1162
1163 void echo_client_cleanup(void)
1164 {
1165         class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME);
1166 }