1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #define DEBUG_SUBSYSTEM S_ECHO
24 #include <linux/version.h>
25 #include <linux/module.h>
27 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
28 #include <linux/iobuf.h>
30 #include <asm/div64.h>
32 #include <liblustre.h>
35 #include <linux/obd.h>
36 #include <linux/obd_support.h>
37 #include <linux/obd_class.h>
38 #include <linux/obd_echo.h>
39 #include <linux/lustre_debug.h>
40 #include <linux/lprocfs_status.h>
41 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_SETSTRIPE */
45 echo_printk_object (char *msg, struct ec_object *eco)
47 struct lov_stripe_md *lsm = eco->eco_lsm;
50 printk (KERN_INFO "%s: object %p: "LPX64", refs %d%s: "LPX64
51 "=%u!%u@%d\n", msg, eco, eco->eco_id, eco->eco_refcount,
52 eco->eco_deleted ? "(deleted) " : "",
53 lsm->lsm_object_id, lsm->lsm_stripe_size,
54 lsm->lsm_stripe_count, lsm->lsm_stripe_offset);
56 for (i = 0; i < lsm->lsm_stripe_count; i++)
57 printk (KERN_INFO " [%2u]"LPX64"\n",
58 lsm->lsm_oinfo[i].loi_ost_idx,
59 lsm->lsm_oinfo[i].loi_id);
63 static struct ec_object *
64 echo_find_object_locked (struct obd_device *obd, obd_id id)
66 struct echo_client_obd *ec = &obd->u.echo_client;
67 struct ec_object *eco = NULL;
70 list_for_each (el, &ec->ec_objects) {
71 eco = list_entry (el, struct ec_object, eco_obj_chain);
73 if (eco->eco_id == id)
80 echo_copyout_lsm (struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob)
84 nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
88 if (copy_to_user (ulsm, lsm, nob))
95 echo_copyin_lsm (struct obd_device *obd, struct lov_stripe_md *lsm,
96 void *ulsm, int ulsm_nob)
98 struct echo_client_obd *ec = &obd->u.echo_client;
101 if (ulsm_nob < sizeof (*lsm))
104 if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
107 nob = lsm->lsm_stripe_count * sizeof (lsm->lsm_oinfo[0]);
109 if (ulsm_nob < nob ||
110 lsm->lsm_stripe_count > ec->ec_nstripes ||
111 lsm->lsm_magic != LOV_MAGIC ||
112 (lsm->lsm_stripe_offset != 0 &&
113 lsm->lsm_stripe_offset != 0xffffffff &&
114 lsm->lsm_stripe_offset >= ec->ec_nstripes) ||
115 (lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 ||
116 ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
119 LASSERT (ec->ec_lsmsize >= sizeof (*lsm) + nob);
121 if (copy_from_user(lsm->lsm_oinfo,
122 ((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob))
128 static struct ec_object *
129 echo_allocate_object (struct obd_device *obd)
131 struct echo_client_obd *ec = &obd->u.echo_client;
132 struct ec_object *eco;
134 OBD_ALLOC (eco, sizeof (*eco));
138 OBD_ALLOC (eco->eco_lsm, ec->ec_lsmsize);
139 if (eco->eco_lsm == NULL) {
140 OBD_FREE (eco, sizeof (*eco));
144 eco->eco_device = obd;
145 eco->eco_deleted = 0;
146 eco->eco_refcount = 0;
147 eco->eco_lsm->lsm_magic = LOV_MAGIC;
148 /* leave stripe count 0 by default */
154 echo_free_object (struct ec_object *eco)
156 struct obd_device *obd = eco->eco_device;
157 struct echo_client_obd *ec = &obd->u.echo_client;
159 LASSERT (eco->eco_refcount == 0);
160 OBD_FREE (eco->eco_lsm, ec->ec_lsmsize);
161 OBD_FREE (eco, sizeof (*eco));
165 echo_create_object (struct obd_device *obd, int on_target, struct obdo *oa,
166 void *ulsm, int ulsm_nob)
168 struct echo_client_obd *ec = &obd->u.echo_client;
169 struct ec_object *eco2;
170 struct ec_object *eco;
171 struct lov_stripe_md *lsm;
175 if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
176 (on_target || /* set_stripe */
177 ec->ec_nstripes != 0)) { /* LOV */
178 CERROR ("No valid oid\n");
182 eco = echo_allocate_object (obd);
189 rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob);
194 /* setup object ID here for !on_target and LOV hint */
195 if ((oa->o_valid & OBD_MD_FLID) != 0)
196 eco->eco_id = lsm->lsm_object_id = oa->o_id;
198 /* defaults -> actual values */
199 if (lsm->lsm_stripe_offset == 0xffffffff)
200 lsm->lsm_stripe_offset = 0;
202 if (lsm->lsm_stripe_count == 0)
203 lsm->lsm_stripe_count = ec->ec_nstripes;
205 if (lsm->lsm_stripe_size == 0)
206 lsm->lsm_stripe_size = PAGE_SIZE;
208 /* setup stripes: indices + default ids if required */
209 for (i = 0; i < lsm->lsm_stripe_count; i++) {
210 if (lsm->lsm_oinfo[i].loi_id == 0)
211 lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
213 lsm->lsm_oinfo[i].loi_ost_idx =
214 (lsm->lsm_stripe_offset + i) % ec->ec_nstripes;
218 rc = obd_create (&ec->ec_conn, oa, &lsm, NULL);
222 /* See what object ID we were given */
223 LASSERT ((oa->o_valid & OBD_MD_FLID) != 0);
224 eco->eco_id = lsm->lsm_object_id = oa->o_id;
227 spin_lock (&ec->ec_lock);
229 eco2 = echo_find_object_locked (obd, oa->o_id);
230 if (eco2 != NULL) { /* conflict */
231 spin_unlock (&ec->ec_lock);
233 CERROR ("Can't create object id "LPX64": id already exists%s\n",
234 oa->o_id, on_target ? " (undoing create)" : "");
237 obd_destroy (&ec->ec_conn, oa, lsm, NULL);
243 list_add (&eco->eco_obj_chain, &ec->ec_objects);
244 spin_unlock (&ec->ec_lock);
246 "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
248 eco->eco_lsm->lsm_stripe_size,
249 eco->eco_lsm->lsm_stripe_count,
250 eco->eco_lsm->lsm_stripe_offset,
251 eco->eco_refcount, eco->eco_deleted);
255 echo_free_object (eco);
260 echo_get_object (struct ec_object **ecop, struct obd_device *obd,
263 struct echo_client_obd *ec = &obd->u.echo_client;
264 struct ec_object *eco;
265 struct ec_object *eco2;
268 if ((oa->o_valid & OBD_MD_FLID) == 0)
270 CERROR ("No valid oid\n");
274 spin_lock (&ec->ec_lock);
275 eco = echo_find_object_locked (obd, oa->o_id);
277 if (eco->eco_deleted) /* being deleted */
278 return (-EAGAIN); /* (see comment in cleanup) */
281 spin_unlock (&ec->ec_lock);
284 "found %p: "LPX64"=%u#%u&%d refs %d del %d\n",
286 eco->eco_lsm->lsm_stripe_size,
287 eco->eco_lsm->lsm_stripe_count,
288 eco->eco_lsm->lsm_stripe_offset,
289 eco->eco_refcount, eco->eco_deleted);
292 spin_unlock (&ec->ec_lock);
294 if (ec->ec_nstripes != 0) /* striping required */
297 eco = echo_allocate_object (obd);
301 eco->eco_id = eco->eco_lsm->lsm_object_id = oa->o_id;
303 spin_lock (&ec->ec_lock);
305 eco2 = echo_find_object_locked (obd, oa->o_id);
306 if (eco2 == NULL) { /* didn't race */
307 list_add (&eco->eco_obj_chain, &ec->ec_objects);
308 spin_unlock (&ec->ec_lock);
309 eco->eco_refcount = 1;
312 "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
314 eco->eco_lsm->lsm_stripe_size,
315 eco->eco_lsm->lsm_stripe_count,
316 eco->eco_lsm->lsm_stripe_offset,
317 eco->eco_refcount, eco->eco_deleted);
321 if (eco2->eco_deleted)
322 rc = -EAGAIN; /* lose race */
324 eco2->eco_refcount++; /* take existing */
327 LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id);
329 "found(2) %p: "LPX64"=%u#%u&%d refs %d del %d\n",
331 eco2->eco_lsm->lsm_stripe_size,
332 eco2->eco_lsm->lsm_stripe_count,
333 eco2->eco_lsm->lsm_stripe_offset,
334 eco2->eco_refcount, eco2->eco_deleted);
337 spin_unlock (&ec->ec_lock);
339 echo_free_object (eco);
344 echo_put_object (struct ec_object *eco)
346 struct obd_device *obd = eco->eco_device;
347 struct echo_client_obd *ec = &obd->u.echo_client;
349 /* Release caller's ref on the object.
350 * delete => mark for deletion when last ref goes
353 spin_lock (&ec->ec_lock);
356 LASSERT (eco->eco_refcount >= 0);
358 if (eco->eco_refcount != 0 ||
360 spin_unlock (&ec->ec_lock);
364 spin_unlock (&ec->ec_lock);
366 /* NB leave obj in the object list. We must prevent anyone from
367 * attempting to enqueue on this object number until we can be
368 * sure there will be no more lock callbacks.
370 obd_cancel_unused (&ec->ec_conn, eco->eco_lsm, 0);
372 /* now we can let it go */
373 spin_lock (&ec->ec_lock);
374 list_del (&eco->eco_obj_chain);
375 spin_unlock (&ec->ec_lock);
377 LASSERT (eco->eco_refcount == 0);
379 echo_free_object (eco);
383 echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
385 unsigned long stripe_count;
386 unsigned long stripe_size;
388 unsigned long woffset;
392 if (lsm->lsm_stripe_count <= 1)
396 stripe_size = lsm->lsm_stripe_size;
397 stripe_count = lsm->lsm_stripe_count;
399 /* width = # bytes in all stripes */
400 width = stripe_size * stripe_count;
402 /* woffset = offset within a width; offset = whole number of widths */
403 woffset = do_div (offset, width);
405 stripe_index = woffset / stripe_size;
407 *idp = lsm->lsm_oinfo[stripe_index].loi_id;
408 *offp = offset * stripe_size + woffset % stripe_size;
412 echo_client_kbrw (struct obd_device *obd, int rw,
413 struct obdo *oa, struct lov_stripe_md *lsm,
414 obd_off offset, obd_size count)
416 struct echo_client_obd *ec = &obd->u.echo_client;
417 struct obd_brw_set *set;
419 struct brw_page *pga;
420 struct brw_page *pgp;
427 /* oa_id == 0 => speed test (no verification) else...
428 * oa & 1 => use HIGHMEM
430 verify = (oa->o_id != 0);
431 gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
433 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
436 (count & (PAGE_SIZE - 1)) != 0 ||
438 lsm->lsm_object_id != oa->o_id))
441 set = obd_brw_set_new();
445 /* XXX think again with misaligned I/O */
446 npages = count >> PAGE_SHIFT;
449 OBD_ALLOC(pga, npages * sizeof(*pga));
453 for (i = 0, pgp = pga, off = offset;
455 i++, pgp++, off += PAGE_SIZE) {
457 LASSERT (pgp->pg == NULL); /* for cleanup */
460 pgp->pg = alloc_pages (gfp_mask, 0);
464 pgp->count = PAGE_SIZE;
469 void *addr = kmap(pgp->pg);
470 obd_off stripe_off = off;
471 obd_id stripe_id = oa->o_id;
473 if (rw == OBD_BRW_WRITE) {
474 echo_get_stripe_off_id(lsm, &stripe_off,
476 page_debug_setup(addr, pgp->count,
477 stripe_off, stripe_id);
479 page_debug_setup(addr, pgp->count,
487 set->brw_callback = ll_brw_sync_wait;
488 rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, set, NULL);
490 rc = ll_brw_sync_wait(set, CB_PHASE_START);
496 for (i = 0, pgp = pga; i < npages; i++, pgp++) {
501 void *addr = kmap(pgp->pg);
502 obd_off stripe_off = pgp->off;
503 obd_id stripe_id = oa->o_id;
506 echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
507 vrc = page_debug_check("test_brw", addr, pgp->count,
508 stripe_off, stripe_id);
509 if (vrc != 0 && rc == 0)
514 __free_pages(pgp->pg, 0);
516 OBD_FREE(pga, npages * sizeof(*pga));
518 obd_brw_set_free(set);
523 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
524 static int echo_client_ubrw(struct obd_device *obd, int rw,
525 struct obdo *oa, struct lov_stripe_md *lsm,
526 obd_off offset, obd_size count, char *buffer)
528 struct echo_client_obd *ec = &obd->u.echo_client;
529 struct obd_brw_set *set;
531 struct brw_page *pga;
532 struct brw_page *pgp;
534 struct kiobuf *kiobuf;
538 LASSERT (rw == OBD_BRW_WRITE ||
541 /* NB: for now, only whole pages, page aligned */
544 ((long)buffer & (PAGE_SIZE - 1)) != 0 ||
545 (count & (PAGE_SIZE - 1)) != 0 ||
546 (lsm != NULL && lsm->lsm_object_id != oa->o_id))
549 set = obd_brw_set_new();
553 /* XXX think again with misaligned I/O */
554 npages = count >> PAGE_SHIFT;
557 OBD_ALLOC(pga, npages * sizeof(*pga));
561 rc = alloc_kiovec (1, &kiobuf);
565 rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE,
566 kiobuf, (unsigned long)buffer, count);
570 LASSERT (kiobuf->offset == 0);
571 LASSERT (kiobuf->nr_pages == npages);
573 for (i = 0, off = offset, pgp = pga;
575 i++, off += PAGE_SIZE, pgp++) {
577 pgp->pg = kiobuf->maplist[i];
578 pgp->count = PAGE_SIZE;
582 set->brw_callback = ll_brw_sync_wait;
583 rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, set, NULL);
586 rc = ll_brw_sync_wait(set, CB_PHASE_START);
588 // if (rw == OBD_BRW_READ)
589 // mark_dirty_kiobuf (kiobuf, count);
591 unmap_kiobuf (kiobuf);
593 free_kiovec (1, &kiobuf);
595 OBD_FREE(pga, npages * sizeof(*pga));
597 obd_brw_set_free(set);
601 static int echo_client_ubrw(struct obd_device *obd, int rw,
602 struct obdo *oa, struct lov_stripe_md *lsm,
603 obd_off offset, obd_size count, char *buffer)
612 echo_open (struct obd_export *exp, struct obdo *oa)
614 struct obd_device *obd = exp->exp_obd;
615 struct echo_client_obd *ec = &obd->u.echo_client;
616 struct lustre_handle *ufh = obdo_handle (oa);
617 struct ec_open_object *ecoo;
618 struct ec_object *eco;
621 rc = echo_get_object (&eco, obd, oa);
626 OBD_ALLOC (ecoo, sizeof (*ecoo));
630 rc = obd_open (&ec->ec_conn, oa, eco->eco_lsm, NULL);
634 memcpy (&ecoo->ecoo_oa, oa, sizeof (*oa));
635 ecoo->ecoo_object = eco;
636 /* ecoo takes ref from echo_get_object() above */
638 spin_lock (&ec->ec_lock);
640 list_add (&ecoo->ecoo_exp_chain, &exp->exp_ec_data.eced_open_head);
642 ufh->addr = (__u64)((long) ecoo);
643 ufh->cookie = ecoo->ecoo_cookie = ec->ec_unique++;
645 spin_unlock (&ec->ec_lock);
649 OBD_FREE (ecoo, sizeof (*ecoo));
651 echo_put_object (eco);
656 echo_close (struct obd_export *exp, struct obdo *oa)
658 struct obd_device *obd = exp->exp_obd;
659 struct echo_client_obd *ec = &obd->u.echo_client;
660 struct lustre_handle *ufh = obdo_handle (oa);
661 struct ec_open_object *ecoo = NULL;
663 struct list_head *el;
666 if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
669 spin_lock (&ec->ec_lock);
671 list_for_each (el, &exp->exp_ec_data.eced_open_head) {
672 ecoo = list_entry (el, struct ec_open_object, ecoo_exp_chain);
673 if ((__u64)((long)ecoo) == ufh->addr) {
674 found = (ecoo->ecoo_cookie == ufh->cookie);
676 list_del (&ecoo->ecoo_exp_chain);
681 spin_unlock (&ec->ec_lock);
686 rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
687 ecoo->ecoo_object->eco_lsm, NULL);
689 echo_put_object (ecoo->ecoo_object);
690 OBD_FREE (ecoo, sizeof (*ecoo));
696 echo_ldlm_callback (struct ldlm_lock *lock, struct ldlm_lock_desc *new,
697 void *data, int flag)
699 struct ec_object *eco = (struct ec_object *)data;
700 struct echo_client_obd *ec = &(eco->eco_device->u.echo_client);
701 struct lustre_handle lockh;
702 struct list_head *el;
706 ldlm_lock2handle (lock, &lockh);
708 /* #ifdef this out if we're not feeling paranoid */
709 spin_lock (&ec->ec_lock);
710 list_for_each (el, &ec->ec_objects) {
711 found = (eco == list_entry(el, struct ec_object,
716 spin_unlock (&ec->ec_lock);
720 case LDLM_CB_BLOCKING:
721 CDEBUG (D_INFO, "blocking callback on "LPX64", handle "LPX64"."
722 LPX64"\n", eco->eco_id, lockh.addr, lockh.cookie);
723 rc = ldlm_cli_cancel (&lockh);
725 CERROR ("ldlm_cli_cancel failed: %d\n", rc);
728 case LDLM_CB_CANCELING:
729 CDEBUG (D_INFO, "canceling callback on "LPX64", handle "LPX64"."
730 LPX64"\n", eco->eco_id, lockh.addr, lockh.cookie);
741 echo_enqueue (struct obd_export *exp, struct obdo *oa,
742 int mode, obd_off offset, obd_size nob)
744 struct obd_device *obd = exp->exp_obd;
745 struct echo_client_obd *ec = &obd->u.echo_client;
746 struct lustre_handle *ulh = obdo_handle (oa);
747 struct ec_object *eco;
752 if (!(mode == LCK_PR || mode == LCK_PW))
755 if ((offset & (PAGE_SIZE - 1)) != 0 ||
756 (nob & (PAGE_SIZE - 1)) != 0)
759 rc = echo_get_object (&eco, obd, oa);
764 OBD_ALLOC (ecl, sizeof (*ecl));
768 ecl->ecl_mode = mode;
769 ecl->ecl_object = eco;
770 ecl->ecl_extent.start = offset;
771 ecl->ecl_extent.end = (nob == 0) ? ((obd_off)-1) : (offset + nob - 1);
774 rc = obd_enqueue (&ec->ec_conn, eco->eco_lsm, NULL, LDLM_EXTENT,
775 &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
776 &flags, echo_ldlm_callback, eco, sizeof (*eco),
781 CDEBUG (D_INFO, "enqueue handle "LPX64"."LPX64"\n",
782 ecl->ecl_handle.addr, ecl->ecl_handle.cookie);
784 /* NB ecl takes object ref from echo_get_object() above */
786 spin_lock (&ec->ec_lock);
788 list_add (&ecl->ecl_exp_chain, &exp->exp_ec_data.eced_locks);
790 ulh->addr = (__u64)((long)ecl);
791 ulh->cookie = ecl->ecl_cookie = ec->ec_unique++;
793 spin_unlock (&ec->ec_lock);
795 oa->o_valid |= OBD_MD_FLHANDLE;
799 OBD_FREE (ecl, sizeof (*ecl));
801 echo_put_object (eco);
806 echo_cancel (struct obd_export *exp, struct obdo *oa)
808 struct obd_device *obd = exp->exp_obd;
809 struct echo_client_obd *ec = &obd->u.echo_client;
810 struct lustre_handle *ulh = obdo_handle (oa);
811 struct ec_lock *ecl = NULL;
813 struct list_head *el;
816 if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
819 spin_lock (&ec->ec_lock);
821 list_for_each (el, &exp->exp_ec_data.eced_locks) {
822 ecl = list_entry (el, struct ec_lock, ecl_exp_chain);
824 if ((__u64)((long)ecl) == ulh->addr) {
825 found = (ecl->ecl_cookie == ulh->cookie);
827 list_del (&ecl->ecl_exp_chain);
832 spin_unlock (&ec->ec_lock);
837 rc = obd_cancel (&ec->ec_conn,
838 ecl->ecl_object->eco_lsm,
842 echo_put_object (ecl->ecl_object);
843 OBD_FREE (ecl, sizeof (*ecl));
848 static int echo_iocontrol(unsigned int cmd, struct lustre_handle *obdconn,
849 int len, void *karg, void *uarg)
851 struct obd_export *exp = class_conn2export (obdconn);
852 struct obd_device *obd;
853 struct echo_client_obd *ec;
854 struct ec_object *eco;
855 struct obd_ioctl_data *data = karg;
856 int rw = OBD_BRW_READ;
861 CERROR("ioctl: No device\n");
862 GOTO(out, rc = -EINVAL);
866 ec = &obd->u.echo_client;
869 case OBD_IOC_CREATE: /* may create echo object */
870 if (!capable (CAP_SYS_ADMIN))
871 GOTO (out, rc = -EPERM);
873 rc = echo_create_object (obd, 1, &data->ioc_obdo1,
874 data->ioc_pbuf1, data->ioc_plen1);
877 case OBD_IOC_DESTROY:
878 if (!capable (CAP_SYS_ADMIN))
879 GOTO (out, rc = -EPERM);
881 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
883 rc = obd_destroy(&ec->ec_conn, &data->ioc_obdo1,
886 eco->eco_deleted = 1;
887 echo_put_object(eco);
891 case OBD_IOC_GETATTR:
892 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
894 rc = obd_getattr(&ec->ec_conn, &data->ioc_obdo1,
896 echo_put_object(eco);
900 case OBD_IOC_SETATTR:
901 if (!capable (CAP_SYS_ADMIN))
902 GOTO (out, rc = -EPERM);
904 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
906 rc = obd_setattr(&ec->ec_conn, &data->ioc_obdo1,
908 echo_put_object(eco);
913 rc = echo_open (exp, &data->ioc_obdo1);
917 rc = echo_close (exp, &data->ioc_obdo1);
920 case OBD_IOC_BRW_WRITE:
921 if (!capable (CAP_SYS_ADMIN))
922 GOTO (out, rc = -EPERM);
926 case OBD_IOC_BRW_READ:
927 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
929 if (data->ioc_pbuf2 == NULL) // NULL user data pointer
930 rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
936 rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
942 echo_put_object(eco);
946 case ECHO_IOC_GET_STRIPE:
947 rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
949 rc = echo_copyout_lsm(eco->eco_lsm, data->ioc_pbuf1,
951 echo_put_object(eco);
955 case ECHO_IOC_SET_STRIPE:
956 if (!capable (CAP_SYS_ADMIN))
957 GOTO (out, rc = -EPERM);
959 if (data->ioc_pbuf1 == NULL) { /* unset */
960 rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
962 eco->eco_deleted = 1;
963 echo_put_object(eco);
966 rc = echo_create_object(obd, 0, &data->ioc_obdo1,
972 case ECHO_IOC_ENQUEUE:
973 if (!capable (CAP_SYS_ADMIN))
974 GOTO (out, rc = -EPERM);
976 rc = echo_enqueue (exp, &data->ioc_obdo1,
977 data->ioc_conn1, /* lock mode */
978 data->ioc_offset, data->ioc_count);/*extent*/
981 case ECHO_IOC_CANCEL:
982 rc = echo_cancel (exp, &data->ioc_obdo1);
986 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
987 GOTO (out, rc = -ENOTTY);
994 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
996 struct obd_ioctl_data* data = buf;
997 struct echo_client_obd *ec = &obddev->u.echo_client;
998 struct obd_device *tgt;
999 struct obd_uuid uuid;
1000 struct lov_stripe_md *lsm = NULL;
1001 struct obd_uuid echo_uuid = { "ECHO_UUID" };
1005 if (data->ioc_inllen1 < 1) {
1006 CERROR("requires a TARGET OBD UUID\n");
1009 if (data->ioc_inllen1 > 37) {
1010 CERROR("OBD UUID must be less than 38 characters\n");
1014 obd_str2uuid(&uuid, data->ioc_inlbuf1);
1015 tgt = class_uuid2obd(&uuid);
1016 if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
1017 !(tgt->obd_flags & OBD_SET_UP)) {
1018 CERROR("device not attached or not set up (%d)\n",
1020 RETURN(rc = -EINVAL);
1023 spin_lock_init (&ec->ec_lock);
1024 INIT_LIST_HEAD (&ec->ec_objects);
1027 rc = obd_connect(&ec->ec_conn, tgt, &echo_uuid, NULL, NULL);
1029 CERROR("fail to connect to device %d\n", data->ioc_dev);
1033 ec->ec_lsmsize = obd_alloc_memmd (&ec->ec_conn, &lsm);
1034 if (ec->ec_lsmsize < 0) {
1035 CERROR ("Can't get # stripes: %d\n", rc);
1036 obd_disconnect (&ec->ec_conn);
1037 rc = ec->ec_lsmsize;
1039 ec->ec_nstripes = lsm->lsm_stripe_count;
1040 obd_free_memmd (&ec->ec_conn, &lsm);
1046 static int echo_cleanup(struct obd_device * obddev)
1048 struct list_head *el;
1049 struct ec_object *eco;
1050 struct echo_client_obd *ec = &obddev->u.echo_client;
1054 if (!list_empty(&obddev->obd_exports)) {
1055 CERROR("still has clients!\n");
1059 /* XXX assuming sole access */
1060 while (!list_empty (&ec->ec_objects)) {
1061 el = ec->ec_objects.next;
1062 eco = list_entry (el, struct ec_object, eco_obj_chain);
1064 LASSERT (eco->eco_refcount == 0);
1065 eco->eco_refcount = 1;
1066 eco->eco_deleted = 1;
1067 echo_put_object (eco);
1070 rc = obd_disconnect (&ec->ec_conn);
1072 CERROR("fail to disconnect device: %d\n", rc);
1077 static int echo_connect(struct lustre_handle *conn, struct obd_device *src,
1078 struct obd_uuid *cluuid, struct recovd_obd *recovd,
1079 ptlrpc_recovery_cb_t recover)
1081 struct obd_export *exp;
1084 rc = class_connect(conn, src, cluuid);
1086 exp = class_conn2export (conn);
1087 INIT_LIST_HEAD (&exp->exp_ec_data.eced_open_head);
1088 INIT_LIST_HEAD (&exp->exp_ec_data.eced_locks);
1094 static int echo_disconnect(struct lustre_handle *conn)
1096 struct obd_export *exp = class_conn2export (conn);
1097 struct obd_device *obd;
1098 struct echo_client_obd *ec;
1099 struct ec_open_object *ecoo;
1100 struct ec_lock *ecl;
1107 ec = &obd->u.echo_client;
1109 /* no more contention on export's lock list */
1110 while (!list_empty (&exp->exp_ec_data.eced_locks)) {
1111 ecl = list_entry (exp->exp_ec_data.eced_locks.next,
1112 struct ec_lock, ecl_exp_chain);
1113 list_del (&ecl->ecl_exp_chain);
1115 rc = obd_cancel (&ec->ec_conn, ecl->ecl_object->eco_lsm,
1116 ecl->ecl_mode, &ecl->ecl_handle);
1118 CERROR ("Cancel lock on object "LPX64" on disconnect (%d)\n",
1119 ecl->ecl_object->eco_id, rc);
1121 echo_put_object (ecl->ecl_object);
1122 OBD_FREE (ecl, sizeof (*ecl));
1125 /* no more contention on export's open handle list */
1126 while (!list_empty (&exp->exp_ec_data.eced_open_head)) {
1127 ecoo = list_entry (exp->exp_ec_data.eced_open_head.next,
1128 struct ec_open_object, ecoo_exp_chain);
1129 list_del (&ecoo->ecoo_exp_chain);
1131 rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
1132 ecoo->ecoo_object->eco_lsm, NULL);
1134 CDEBUG (D_INFO, "Closed object "LPX64" on disconnect (%d)\n",
1135 ecoo->ecoo_oa.o_id, rc);
1137 echo_put_object (ecoo->ecoo_object);
1138 OBD_FREE (ecoo, sizeof (*ecoo));
1141 rc = class_disconnect (conn);
1145 static struct obd_ops echo_obd_ops = {
1146 o_owner: THIS_MODULE,
1147 o_setup: echo_setup,
1148 o_cleanup: echo_cleanup,
1149 o_iocontrol: echo_iocontrol,
1150 o_connect: echo_connect,
1151 o_disconnect: echo_disconnect
1154 int echo_client_init(void)
1156 struct lprocfs_static_vars lvars;
1158 lprocfs_init_vars(&lvars);
1159 return class_register_type(&echo_obd_ops, lvars.module_vars,
1160 OBD_ECHO_CLIENT_DEVICENAME);
1163 void echo_client_cleanup(void)
1165 class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME);