1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #include <linux/version.h>
23 #include <linux/module.h>
25 #include <linux/iobuf.h>
26 #include <asm/div64.h>
28 #define DEBUG_SUBSYSTEM S_ECHO
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/obd_echo.h>
33 #include <linux/lustre_debug.h>
34 #include <linux/lprocfs_status.h>
35 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_SETSTRIPE */
39 echo_printk_object (char *msg, struct ec_object *eco)
41 struct lov_stripe_md *lsm = eco->eco_lsm;
44 printk (KERN_INFO "%s: object %p: "LPX64", refs %d%s: "LPX64
45 "=%u!%u@%d\n", msg, eco, eco->eco_id, eco->eco_refcount,
46 eco->eco_deleted ? "(deleted) " : "",
47 lsm->lsm_object_id, lsm->lsm_stripe_size,
48 lsm->lsm_stripe_count, lsm->lsm_stripe_offset);
50 for (i = 0; i < lsm->lsm_stripe_count; i++)
51 printk (KERN_INFO " [%2u]"LPX64"\n",
52 lsm->lsm_oinfo[i].loi_ost_idx,
53 lsm->lsm_oinfo[i].loi_id);
57 static struct ec_object *
58 echo_find_object_locked (struct obd_device *obd, obd_id id)
60 struct echo_client_obd *ec = &obd->u.echo_client;
61 struct ec_object *eco = NULL;
64 list_for_each (el, &ec->ec_objects) {
65 eco = list_entry (el, struct ec_object, eco_obj_chain);
67 if (eco->eco_id == id)
74 echo_copyout_lsm (struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob)
78 nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
82 if (copy_to_user (ulsm, lsm, nob))
89 echo_copyin_lsm (struct obd_device *obd, struct lov_stripe_md *lsm,
90 void *ulsm, int ulsm_nob)
92 struct echo_client_obd *ec = &obd->u.echo_client;
95 if (ulsm_nob < sizeof (*lsm))
98 if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
101 nob = lsm->lsm_stripe_count * sizeof (lsm->lsm_oinfo[0]);
103 if (ulsm_nob < nob ||
104 lsm->lsm_stripe_count > ec->ec_nstripes ||
105 lsm->lsm_magic != LOV_MAGIC ||
106 (lsm->lsm_stripe_offset != 0 &&
107 lsm->lsm_stripe_offset != 0xffffffff &&
108 lsm->lsm_stripe_offset >= ec->ec_nstripes) ||
109 (lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 ||
110 ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
113 LASSERT (ec->ec_lsmsize >= sizeof (*lsm) + nob);
115 if (copy_from_user(lsm->lsm_oinfo,
116 ((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob))
122 static struct ec_object *
123 echo_allocate_object (struct obd_device *obd)
125 struct echo_client_obd *ec = &obd->u.echo_client;
126 struct ec_object *eco;
128 OBD_ALLOC (eco, sizeof (*eco));
132 OBD_ALLOC (eco->eco_lsm, ec->ec_lsmsize);
133 if (eco->eco_lsm == NULL) {
134 OBD_FREE (eco, sizeof (*eco));
138 eco->eco_device = obd;
139 eco->eco_deleted = 0;
140 eco->eco_refcount = 0;
141 eco->eco_lsm->lsm_magic = LOV_MAGIC;
142 /* leave stripe count 0 by default */
148 echo_free_object (struct ec_object *eco)
150 struct obd_device *obd = eco->eco_device;
151 struct echo_client_obd *ec = &obd->u.echo_client;
153 LASSERT (eco->eco_refcount == 0);
154 OBD_FREE (eco->eco_lsm, ec->ec_lsmsize);
155 OBD_FREE (eco, sizeof (*eco));
159 echo_create_object (struct obd_device *obd, int on_target, struct obdo *oa,
160 void *ulsm, int ulsm_nob)
162 struct echo_client_obd *ec = &obd->u.echo_client;
163 struct ec_object *eco2;
164 struct ec_object *eco;
165 struct lov_stripe_md *lsm;
169 if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
170 (on_target || /* set_stripe */
171 ec->ec_nstripes != 0)) { /* LOV */
172 CERROR ("No valid oid\n");
176 eco = echo_allocate_object (obd);
183 rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob);
188 /* setup object ID here for !on_target and LOV hint */
189 if ((oa->o_valid & OBD_MD_FLID) != 0)
190 eco->eco_id = lsm->lsm_object_id = oa->o_id;
192 /* defaults -> actual values */
193 if (lsm->lsm_stripe_offset == 0xffffffff)
194 lsm->lsm_stripe_offset = 0;
196 if (lsm->lsm_stripe_count == 0)
197 lsm->lsm_stripe_count = ec->ec_nstripes;
199 if (lsm->lsm_stripe_size == 0)
200 lsm->lsm_stripe_size = PAGE_SIZE;
202 /* setup stripes: indices + default ids if required */
203 for (i = 0; i < lsm->lsm_stripe_count; i++) {
204 if (lsm->lsm_oinfo[i].loi_id == 0)
205 lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
207 lsm->lsm_oinfo[i].loi_ost_idx =
208 (lsm->lsm_stripe_offset + i) % ec->ec_nstripes;
212 rc = obd_create (&ec->ec_conn, oa, &lsm, NULL);
216 /* See what object ID we were given */
217 LASSERT ((oa->o_valid & OBD_MD_FLID) != 0);
218 eco->eco_id = lsm->lsm_object_id = oa->o_id;
221 spin_lock (&ec->ec_lock);
223 eco2 = echo_find_object_locked (obd, oa->o_id);
224 if (eco2 != NULL) { /* conflict */
225 spin_unlock (&ec->ec_lock);
227 CERROR ("Can't create object id "LPX64": id already exists%s\n",
228 oa->o_id, on_target ? " (undoing create)" : "");
231 obd_destroy (&ec->ec_conn, oa, lsm, NULL);
237 list_add (&eco->eco_obj_chain, &ec->ec_objects);
238 spin_unlock (&ec->ec_lock);
240 "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
242 eco->eco_lsm->lsm_stripe_size,
243 eco->eco_lsm->lsm_stripe_count,
244 eco->eco_lsm->lsm_stripe_offset,
245 eco->eco_refcount, eco->eco_deleted);
249 echo_free_object (eco);
254 echo_get_object (struct ec_object **ecop, struct obd_device *obd, struct obdo *oa)
256 struct echo_client_obd *ec = &obd->u.echo_client;
257 struct ec_object *eco;
258 struct ec_object *eco2;
261 if ((oa->o_valid & OBD_MD_FLID) == 0)
263 CERROR ("No valid oid\n");
267 spin_lock (&ec->ec_lock);
268 eco = echo_find_object_locked (obd, oa->o_id);
270 if (eco->eco_deleted) /* being deleted */
271 return (-EAGAIN); /* (see comment in cleanup) */
274 spin_unlock (&ec->ec_lock);
277 "found %p: "LPX64"=%u#%u&%d refs %d del %d\n",
279 eco->eco_lsm->lsm_stripe_size,
280 eco->eco_lsm->lsm_stripe_count,
281 eco->eco_lsm->lsm_stripe_offset,
282 eco->eco_refcount, eco->eco_deleted);
285 spin_unlock (&ec->ec_lock);
287 if (ec->ec_nstripes != 0) /* striping required */
290 eco = echo_allocate_object (obd);
294 eco->eco_id = eco->eco_lsm->lsm_object_id = oa->o_id;
296 spin_lock (&ec->ec_lock);
298 eco2 = echo_find_object_locked (obd, oa->o_id);
299 if (eco2 == NULL) { /* didn't race */
300 list_add (&eco->eco_obj_chain, &ec->ec_objects);
301 spin_unlock (&ec->ec_lock);
302 eco->eco_refcount = 1;
305 "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
307 eco->eco_lsm->lsm_stripe_size,
308 eco->eco_lsm->lsm_stripe_count,
309 eco->eco_lsm->lsm_stripe_offset,
310 eco->eco_refcount, eco->eco_deleted);
314 if (eco2->eco_deleted)
315 rc = -EAGAIN; /* lose race */
317 eco2->eco_refcount++; /* take existing */
320 LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id);
322 "found(2) %p: "LPX64"=%u#%u&%d refs %d del %d\n",
324 eco2->eco_lsm->lsm_stripe_size,
325 eco2->eco_lsm->lsm_stripe_count,
326 eco2->eco_lsm->lsm_stripe_offset,
327 eco2->eco_refcount, eco2->eco_deleted);
330 spin_unlock (&ec->ec_lock);
332 echo_free_object (eco);
337 echo_put_object (struct ec_object *eco)
339 struct obd_device *obd = eco->eco_device;
340 struct echo_client_obd *ec = &obd->u.echo_client;
342 /* Release caller's ref on the object.
343 * delete => mark for deletion when last ref goes
346 spin_lock (&ec->ec_lock);
349 LASSERT (eco->eco_refcount >= 0);
351 if (eco->eco_refcount != 0 ||
353 spin_unlock (&ec->ec_lock);
357 spin_unlock (&ec->ec_lock);
359 /* NB leave obj in the object list. We must prevent anyone from
360 * attempting to enqueue on this object number until we can be
361 * sure there will be no more lock callbacks.
363 obd_cancel_unused (&ec->ec_conn, eco->eco_lsm, 0);
365 /* now we can let it go */
366 spin_lock (&ec->ec_lock);
367 list_del (&eco->eco_obj_chain);
368 spin_unlock (&ec->ec_lock);
370 LASSERT (eco->eco_refcount == 0);
372 echo_free_object (eco);
376 echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
378 unsigned long stripe_count;
379 unsigned long stripe_size;
381 unsigned long woffset;
385 if (lsm->lsm_stripe_count <= 1)
389 stripe_size = lsm->lsm_stripe_size;
390 stripe_count = lsm->lsm_stripe_count;
392 /* width = # bytes in all stripes */
393 width = stripe_size * stripe_count;
395 /* woffset = offset within a width; offset = whole number of widths */
396 woffset = do_div (offset, width);
398 stripe_index = woffset / stripe_size;
400 *idp = lsm->lsm_oinfo[stripe_index].loi_id;
401 *offp = offset * stripe_size + woffset % stripe_size;
405 echo_client_kbrw (struct obd_device *obd, int rw,
406 struct obdo *oa, struct lov_stripe_md *lsm,
407 obd_off offset, obd_size count)
409 struct echo_client_obd *ec = &obd->u.echo_client;
410 struct obd_brw_set *set;
412 struct brw_page *pga;
413 struct brw_page *pgp;
420 /* oa_id == 0 => speed test (no verification) else...
421 * oa & 1 => use HIGHMEM
423 verify = (oa->o_id != 0);
424 gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
426 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
429 (count & (PAGE_SIZE - 1)) != 0 ||
431 lsm->lsm_object_id != oa->o_id))
434 set = obd_brw_set_new();
438 /* XXX think again with misaligned I/O */
439 npages = count >> PAGE_SHIFT;
442 OBD_ALLOC(pga, npages * sizeof(*pga));
446 for (i = 0, pgp = pga, off = offset;
448 i++, pgp++, off += PAGE_SIZE) {
450 LASSERT (pgp->pg == NULL); /* for cleanup */
453 pgp->pg = alloc_pages (gfp_mask, 0);
457 pgp->count = PAGE_SIZE;
462 void *addr = kmap(pgp->pg);
463 obd_off stripe_off = off;
464 obd_id stripe_id = oa->o_id;
466 if (rw == OBD_BRW_WRITE) {
467 echo_get_stripe_off_id(lsm, &stripe_off,
469 page_debug_setup(addr, pgp->count,
470 stripe_off, stripe_id);
472 page_debug_setup(addr, pgp->count,
480 set->brw_callback = ll_brw_sync_wait;
481 rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, set, NULL);
483 rc = ll_brw_sync_wait(set, CB_PHASE_START);
489 for (i = 0, pgp = pga; i < npages; i++, pgp++) {
494 void *addr = kmap(pgp->pg);
495 obd_off stripe_off = pgp->off;
496 obd_id stripe_id = oa->o_id;
499 echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
500 vrc = page_debug_check("test_brw", addr, pgp->count,
501 stripe_off, stripe_id);
502 if (vrc != 0 && rc == 0)
507 __free_pages(pgp->pg, 0);
509 OBD_FREE(pga, npages * sizeof(*pga));
511 obd_brw_set_free(set);
516 echo_client_ubrw (struct obd_device *obd, int rw,
517 struct obdo *oa, struct lov_stripe_md *lsm,
518 obd_off offset, obd_size count, char *buffer)
520 struct echo_client_obd *ec = &obd->u.echo_client;
521 struct obd_brw_set *set;
523 struct brw_page *pga;
524 struct brw_page *pgp;
526 struct kiobuf *kiobuf;
530 LASSERT (rw == OBD_BRW_WRITE ||
533 /* NB: for now, only whole pages, page aligned */
536 ((long)buffer & (PAGE_SIZE - 1)) != 0 ||
537 (count & (PAGE_SIZE - 1)) != 0 ||
538 (lsm != NULL && lsm->lsm_object_id != oa->o_id))
541 set = obd_brw_set_new();
545 /* XXX think again with misaligned I/O */
546 npages = count >> PAGE_SHIFT;
549 OBD_ALLOC(pga, npages * sizeof(*pga));
553 rc = alloc_kiovec (1, &kiobuf);
557 rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE,
558 kiobuf, (unsigned long)buffer, count);
562 LASSERT (kiobuf->offset == 0);
563 LASSERT (kiobuf->nr_pages == npages);
565 for (i = 0, off = offset, pgp = pga;
567 i++, off += PAGE_SIZE, pgp++) {
569 pgp->pg = kiobuf->maplist[i];
570 pgp->count = PAGE_SIZE;
574 set->brw_callback = ll_brw_sync_wait;
575 rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, set, NULL);
578 rc = ll_brw_sync_wait(set, CB_PHASE_START);
580 // if (rw == OBD_BRW_READ)
581 // mark_dirty_kiobuf (kiobuf, count);
583 unmap_kiobuf (kiobuf);
585 free_kiovec (1, &kiobuf);
587 OBD_FREE(pga, npages * sizeof(*pga));
589 obd_brw_set_free(set);
594 echo_open (struct obd_export *exp, struct obdo *oa)
596 struct obd_device *obd = exp->exp_obd;
597 struct echo_client_obd *ec = &obd->u.echo_client;
598 struct lustre_handle *ufh = obdo_handle (oa);
599 struct ec_open_object *ecoo;
600 struct ec_object *eco;
603 rc = echo_get_object (&eco, obd, oa);
608 OBD_ALLOC (ecoo, sizeof (*ecoo));
612 rc = obd_open (&ec->ec_conn, oa, eco->eco_lsm, NULL);
616 memcpy (&ecoo->ecoo_oa, oa, sizeof (*oa));
617 ecoo->ecoo_object = eco;
618 /* ecoo takes ref from echo_get_object() above */
620 spin_lock (&ec->ec_lock);
622 list_add (&ecoo->ecoo_exp_chain,
623 &exp->exp_ec_data.eced_open_head);
625 ufh->addr = (__u64)((long) ecoo);
626 ufh->cookie = ecoo->ecoo_cookie = ec->ec_unique++;
628 spin_unlock (&ec->ec_lock);
632 OBD_FREE (ecoo, sizeof (*ecoo));
634 echo_put_object (eco);
639 echo_close (struct obd_export *exp, struct obdo *oa)
641 struct obd_device *obd = exp->exp_obd;
642 struct echo_client_obd *ec = &obd->u.echo_client;
643 struct lustre_handle *ufh = obdo_handle (oa);
644 struct ec_open_object *ecoo = NULL;
646 struct list_head *el;
649 if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
652 spin_lock (&ec->ec_lock);
654 list_for_each (el, &exp->exp_ec_data.eced_open_head) {
655 ecoo = list_entry (el, struct ec_open_object, ecoo_exp_chain);
656 if ((__u64)((long)ecoo) == ufh->addr) {
657 found = (ecoo->ecoo_cookie == ufh->cookie);
659 list_del (&ecoo->ecoo_exp_chain);
664 spin_unlock (&ec->ec_lock);
669 rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
670 ecoo->ecoo_object->eco_lsm, NULL);
672 echo_put_object (ecoo->ecoo_object);
673 OBD_FREE (ecoo, sizeof (*ecoo));
679 echo_ldlm_callback (struct ldlm_lock *lock, struct ldlm_lock_desc *new,
680 void *data, int flag)
682 struct ec_object *eco = (struct ec_object *)data;
683 struct echo_client_obd *ec = &(eco->eco_device->u.echo_client);
684 struct lustre_handle lockh;
685 struct list_head *el;
689 ldlm_lock2handle (lock, &lockh);
691 /* #ifdef this out if we're not feeling paranoid */
692 spin_lock (&ec->ec_lock);
693 list_for_each (el, &ec->ec_objects) {
694 found = (eco == list_entry (el, struct ec_object, eco_obj_chain));
698 spin_unlock (&ec->ec_lock);
702 case LDLM_CB_BLOCKING:
703 CDEBUG (D_INFO, "blocking callback on "LPX64", handle "LPX64"."LPX64"\n",
704 eco->eco_id, lockh.addr, lockh.cookie);
705 rc = ldlm_cli_cancel (&lockh);
707 CERROR ("ldlm_cli_cancel failed: %d\n", rc);
710 case LDLM_CB_CANCELING:
711 CDEBUG (D_INFO, "canceling callback on "LPX64", handle "LPX64"."LPX64"\n",
712 eco->eco_id, lockh.addr, lockh.cookie);
723 echo_enqueue (struct obd_export *exp, struct obdo *oa,
724 int mode, obd_off offset, obd_size nob)
726 struct obd_device *obd = exp->exp_obd;
727 struct echo_client_obd *ec = &obd->u.echo_client;
728 struct lustre_handle *ulh = obdo_handle (oa);
729 struct ec_object *eco;
734 if (!(mode == LCK_PR || mode == LCK_PW))
737 if ((offset & (PAGE_SIZE - 1)) != 0 ||
738 (nob & (PAGE_SIZE - 1)) != 0)
741 rc = echo_get_object (&eco, obd, oa);
746 OBD_ALLOC (ecl, sizeof (*ecl));
750 ecl->ecl_mode = mode;
751 ecl->ecl_object = eco;
752 ecl->ecl_extent.start = offset;
753 ecl->ecl_extent.end = (nob == 0) ? ((obd_off)-1) : (offset + nob - 1);
756 rc = obd_enqueue (&ec->ec_conn, eco->eco_lsm, NULL,
757 LDLM_EXTENT, &ecl->ecl_extent, sizeof (ecl->ecl_extent),
758 mode, &flags, echo_ldlm_callback, eco, sizeof (*eco),
763 CDEBUG (D_INFO, "enqueue handle "LPX64"."LPX64"\n",
764 ecl->ecl_handle.addr, ecl->ecl_handle.cookie);
766 /* NB ecl takes object ref from echo_get_object() above */
768 spin_lock (&ec->ec_lock);
770 list_add (&ecl->ecl_exp_chain, &exp->exp_ec_data.eced_locks);
772 ulh->addr = (__u64)((long)ecl);
773 ulh->cookie = ecl->ecl_cookie = ec->ec_unique++;
775 spin_unlock (&ec->ec_lock);
777 oa->o_valid |= OBD_MD_FLHANDLE;
781 OBD_FREE (ecl, sizeof (*ecl));
783 echo_put_object (eco);
788 echo_cancel (struct obd_export *exp, struct obdo *oa)
790 struct obd_device *obd = exp->exp_obd;
791 struct echo_client_obd *ec = &obd->u.echo_client;
792 struct lustre_handle *ulh = obdo_handle (oa);
793 struct ec_lock *ecl = NULL;
795 struct list_head *el;
798 if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
801 spin_lock (&ec->ec_lock);
803 list_for_each (el, &exp->exp_ec_data.eced_locks) {
804 ecl = list_entry (el, struct ec_lock, ecl_exp_chain);
806 if ((__u64)((long)ecl) == ulh->addr) {
807 found = (ecl->ecl_cookie == ulh->cookie);
809 list_del (&ecl->ecl_exp_chain);
814 spin_unlock (&ec->ec_lock);
819 rc = obd_cancel (&ec->ec_conn,
820 ecl->ecl_object->eco_lsm,
824 echo_put_object (ecl->ecl_object);
825 OBD_FREE (ecl, sizeof (*ecl));
830 static int echo_iocontrol(unsigned int cmd, struct lustre_handle *obdconn,
831 int len, void *karg, void *uarg)
833 struct obd_export *exp = class_conn2export (obdconn);
834 struct obd_device *obd;
835 struct echo_client_obd *ec;
836 struct ec_object *eco;
837 struct obd_ioctl_data *data = karg;
838 int rw = OBD_BRW_READ;
843 CERROR("ioctl: No device\n");
844 GOTO(out, rc = -EINVAL);
848 ec = &obd->u.echo_client;
851 case OBD_IOC_CREATE: /* may create echo object */
852 if (!capable (CAP_SYS_ADMIN))
853 GOTO (out, rc = -EPERM);
855 rc = echo_create_object (obd, 1, &data->ioc_obdo1,
856 data->ioc_pbuf1, data->ioc_plen1);
859 case OBD_IOC_DESTROY:
860 if (!capable (CAP_SYS_ADMIN))
861 GOTO (out, rc = -EPERM);
863 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
865 rc = obd_destroy(&ec->ec_conn, &data->ioc_obdo1,
868 eco->eco_deleted = 1;
869 echo_put_object(eco);
873 case OBD_IOC_GETATTR:
874 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
876 rc = obd_getattr(&ec->ec_conn, &data->ioc_obdo1,
878 echo_put_object(eco);
882 case OBD_IOC_SETATTR:
883 if (!capable (CAP_SYS_ADMIN))
884 GOTO (out, rc = -EPERM);
886 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
888 rc = obd_setattr(&ec->ec_conn, &data->ioc_obdo1,
890 echo_put_object(eco);
895 rc = echo_open (exp, &data->ioc_obdo1);
899 rc = echo_close (exp, &data->ioc_obdo1);
902 case OBD_IOC_BRW_WRITE:
903 if (!capable (CAP_SYS_ADMIN))
904 GOTO (out, rc = -EPERM);
908 case OBD_IOC_BRW_READ:
909 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
911 if (data->ioc_pbuf2 == NULL) // NULL user data pointer
912 rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
917 rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
922 echo_put_object(eco);
926 case ECHO_IOC_GET_STRIPE:
927 rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
929 rc = echo_copyout_lsm(eco->eco_lsm, data->ioc_pbuf1,
931 echo_put_object(eco);
935 case ECHO_IOC_SET_STRIPE:
936 if (!capable (CAP_SYS_ADMIN))
937 GOTO (out, rc = -EPERM);
939 if (data->ioc_pbuf1 == NULL) { /* unset */
940 rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
942 eco->eco_deleted = 1;
943 echo_put_object(eco);
946 rc = echo_create_object(obd, 0, &data->ioc_obdo1,
947 data->ioc_pbuf1, data->ioc_plen1);
951 case ECHO_IOC_ENQUEUE:
952 if (!capable (CAP_SYS_ADMIN))
953 GOTO (out, rc = -EPERM);
955 rc = echo_enqueue (exp, &data->ioc_obdo1,
956 data->ioc_conn1, /* lock mode */
957 data->ioc_offset, data->ioc_count); /* extent */
960 case ECHO_IOC_CANCEL:
961 rc = echo_cancel (exp, &data->ioc_obdo1);
965 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
966 GOTO (out, rc = -ENOTTY);
973 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
975 struct obd_ioctl_data* data = buf;
976 struct echo_client_obd *ec = &obddev->u.echo_client;
977 struct obd_device *tgt;
978 struct obd_uuid uuid;
979 struct lov_stripe_md *lsm = NULL;
980 struct obd_uuid echo_uuid = { "ECHO_UUID" };
984 if (data->ioc_inllen1 < 1) {
985 CERROR("requires a TARGET OBD UUID\n");
988 if (data->ioc_inllen1 > 37) {
989 CERROR("OBD UUID must be less than 38 characters\n");
993 obd_str2uuid(&uuid, data->ioc_inlbuf1);
994 tgt = class_uuid2obd(&uuid);
995 if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
996 !(tgt->obd_flags & OBD_SET_UP)) {
997 CERROR("device not attached or not set up (%d)\n",
999 RETURN(rc = -EINVAL);
1002 spin_lock_init (&ec->ec_lock);
1003 INIT_LIST_HEAD (&ec->ec_objects);
1006 rc = obd_connect(&ec->ec_conn, tgt, &echo_uuid, NULL, NULL);
1008 CERROR("fail to connect to device %d\n", data->ioc_dev);
1012 ec->ec_lsmsize = obd_alloc_memmd (&ec->ec_conn, &lsm);
1013 if (ec->ec_lsmsize < 0) {
1014 CERROR ("Can't get # stripes: %d\n", rc);
1015 obd_disconnect (&ec->ec_conn);
1016 rc = ec->ec_lsmsize;
1018 ec->ec_nstripes = lsm->lsm_stripe_count;
1019 obd_free_memmd (&ec->ec_conn, &lsm);
1025 static int echo_cleanup(struct obd_device * obddev)
1027 struct list_head *el;
1028 struct ec_object *eco;
1029 struct echo_client_obd *ec = &obddev->u.echo_client;
1033 if (!list_empty(&obddev->obd_exports)) {
1034 CERROR("still has clients!\n");
1038 /* XXX assuming sole access */
1039 while (!list_empty (&ec->ec_objects)) {
1040 el = ec->ec_objects.next;
1041 eco = list_entry (el, struct ec_object, eco_obj_chain);
1043 LASSERT (eco->eco_refcount == 0);
1044 eco->eco_refcount = 1;
1045 eco->eco_deleted = 1;
1046 echo_put_object (eco);
1049 rc = obd_disconnect (&ec->ec_conn);
1051 CERROR("fail to disconnect device: %d\n", rc);
1056 static int echo_connect(struct lustre_handle *conn, struct obd_device *src,
1057 struct obd_uuid *cluuid, struct recovd_obd *recovd,
1058 ptlrpc_recovery_cb_t recover)
1060 struct obd_export *exp;
1063 rc = class_connect(conn, src, cluuid);
1065 exp = class_conn2export (conn);
1066 INIT_LIST_HEAD (&exp->exp_ec_data.eced_open_head);
1067 INIT_LIST_HEAD (&exp->exp_ec_data.eced_locks);
1073 static int echo_disconnect(struct lustre_handle *conn)
1075 struct obd_export *exp = class_conn2export (conn);
1076 struct obd_device *obd;
1077 struct echo_client_obd *ec;
1078 struct ec_open_object *ecoo;
1079 struct ec_lock *ecl;
1086 ec = &obd->u.echo_client;
1088 /* no more contention on export's lock list */
1089 while (!list_empty (&exp->exp_ec_data.eced_locks)) {
1090 ecl = list_entry (exp->exp_ec_data.eced_locks.next,
1091 struct ec_lock, ecl_exp_chain);
1092 list_del (&ecl->ecl_exp_chain);
1094 rc = obd_cancel (&ec->ec_conn, ecl->ecl_object->eco_lsm,
1095 ecl->ecl_mode, &ecl->ecl_handle);
1097 CERROR ("Cancel lock on object "LPX64" on disconnect (%d)\n",
1098 ecl->ecl_object->eco_id, rc);
1100 echo_put_object (ecl->ecl_object);
1101 OBD_FREE (ecl, sizeof (*ecl));
1104 /* no more contention on export's open handle list */
1105 while (!list_empty (&exp->exp_ec_data.eced_open_head)) {
1106 ecoo = list_entry (exp->exp_ec_data.eced_open_head.next,
1107 struct ec_open_object, ecoo_exp_chain);
1108 list_del (&ecoo->ecoo_exp_chain);
1110 rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
1111 ecoo->ecoo_object->eco_lsm, NULL);
1113 CDEBUG (D_INFO, "Closed object "LPX64" on disconnect (%d)\n",
1114 ecoo->ecoo_oa.o_id, rc);
1116 echo_put_object (ecoo->ecoo_object);
1117 OBD_FREE (ecoo, sizeof (*ecoo));
1120 rc = class_disconnect (conn);
1124 static struct obd_ops echo_obd_ops = {
1125 o_owner: THIS_MODULE,
1126 o_setup: echo_setup,
1127 o_cleanup: echo_cleanup,
1128 o_iocontrol: echo_iocontrol,
1129 o_connect: echo_connect,
1130 o_disconnect: echo_disconnect
1133 int echo_client_init(void)
1135 struct lprocfs_static_vars lvars;
1137 lprocfs_init_vars(&lvars);
1138 return class_register_type(&echo_obd_ops, lvars.module_vars,
1139 OBD_ECHO_CLIENT_DEVICENAME);
1142 void echo_client_cleanup(void)
1144 class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME);