1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #define DEBUG_SUBSYSTEM S_ECHO
24 #include <linux/version.h>
25 #include <linux/module.h>
27 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
28 #include <linux/iobuf.h>
30 #include <asm/div64.h>
32 #include <liblustre.h>
35 #include <linux/obd.h>
36 #include <linux/obd_support.h>
37 #include <linux/obd_class.h>
38 #include <linux/obd_echo.h>
39 #include <linux/lustre_debug.h>
40 #include <linux/lprocfs_status.h>
41 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_SETSTRIPE */
45 echo_printk_object (char *msg, struct ec_object *eco)
47 struct lov_stripe_md *lsm = eco->eco_lsm;
50 printk (KERN_INFO "%s: object %p: "LPX64", refs %d%s: "LPX64
51 "=%u!%u@%d\n", msg, eco, eco->eco_id, eco->eco_refcount,
52 eco->eco_deleted ? "(deleted) " : "",
53 lsm->lsm_object_id, lsm->lsm_stripe_size,
54 lsm->lsm_stripe_count, lsm->lsm_stripe_offset);
56 for (i = 0; i < lsm->lsm_stripe_count; i++)
57 printk (KERN_INFO " [%2u]"LPX64"\n",
58 lsm->lsm_oinfo[i].loi_ost_idx,
59 lsm->lsm_oinfo[i].loi_id);
63 static struct ec_object *
64 echo_find_object_locked (struct obd_device *obd, obd_id id)
66 struct echo_client_obd *ec = &obd->u.echo_client;
67 struct ec_object *eco = NULL;
70 list_for_each (el, &ec->ec_objects) {
71 eco = list_entry (el, struct ec_object, eco_obj_chain);
73 if (eco->eco_id == id)
80 echo_copyout_lsm (struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob)
84 nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
88 if (copy_to_user (ulsm, lsm, nob))
95 echo_copyin_lsm (struct obd_device *obd, struct lov_stripe_md *lsm,
96 void *ulsm, int ulsm_nob)
98 struct echo_client_obd *ec = &obd->u.echo_client;
101 if (ulsm_nob < sizeof (*lsm))
104 if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
107 nob = lsm->lsm_stripe_count * sizeof (lsm->lsm_oinfo[0]);
109 if (ulsm_nob < nob ||
110 lsm->lsm_stripe_count > ec->ec_nstripes ||
111 lsm->lsm_magic != LOV_MAGIC ||
112 (lsm->lsm_stripe_offset != 0 &&
113 lsm->lsm_stripe_offset != 0xffffffff &&
114 lsm->lsm_stripe_offset >= ec->ec_nstripes) ||
115 (lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 ||
116 ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
119 LASSERT (ec->ec_lsmsize >= sizeof (*lsm) + nob);
121 if (copy_from_user(lsm->lsm_oinfo,
122 ((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob))
128 static struct ec_object *
129 echo_allocate_object (struct obd_device *obd)
131 struct echo_client_obd *ec = &obd->u.echo_client;
132 struct ec_object *eco;
134 OBD_ALLOC (eco, sizeof (*eco));
138 OBD_ALLOC (eco->eco_lsm, ec->ec_lsmsize);
139 if (eco->eco_lsm == NULL) {
140 OBD_FREE (eco, sizeof (*eco));
144 eco->eco_device = obd;
145 eco->eco_deleted = 0;
146 eco->eco_refcount = 0;
147 eco->eco_lsm->lsm_magic = LOV_MAGIC;
148 /* leave stripe count 0 by default */
154 echo_free_object (struct ec_object *eco)
156 struct obd_device *obd = eco->eco_device;
157 struct echo_client_obd *ec = &obd->u.echo_client;
159 LASSERT (eco->eco_refcount == 0);
160 OBD_FREE (eco->eco_lsm, ec->ec_lsmsize);
161 OBD_FREE (eco, sizeof (*eco));
165 echo_create_object (struct obd_device *obd, int on_target, struct obdo *oa,
166 void *ulsm, int ulsm_nob)
168 struct echo_client_obd *ec = &obd->u.echo_client;
169 struct ec_object *eco2;
170 struct ec_object *eco;
171 struct lov_stripe_md *lsm;
175 if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
176 (on_target || /* set_stripe */
177 ec->ec_nstripes != 0)) { /* LOV */
178 CERROR ("No valid oid\n");
182 eco = echo_allocate_object (obd);
189 rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob);
194 /* setup object ID here for !on_target and LOV hint */
195 if ((oa->o_valid & OBD_MD_FLID) != 0)
196 eco->eco_id = lsm->lsm_object_id = oa->o_id;
198 /* defaults -> actual values */
199 if (lsm->lsm_stripe_offset == 0xffffffff)
200 lsm->lsm_stripe_offset = 0;
202 if (lsm->lsm_stripe_count == 0)
203 lsm->lsm_stripe_count = ec->ec_nstripes;
205 if (lsm->lsm_stripe_size == 0)
206 lsm->lsm_stripe_size = PAGE_SIZE;
208 /* setup stripes: indices + default ids if required */
209 for (i = 0; i < lsm->lsm_stripe_count; i++) {
210 if (lsm->lsm_oinfo[i].loi_id == 0)
211 lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
213 lsm->lsm_oinfo[i].loi_ost_idx =
214 (lsm->lsm_stripe_offset + i) % ec->ec_nstripes;
218 rc = obd_create (&ec->ec_conn, oa, &lsm, NULL);
222 /* See what object ID we were given */
223 LASSERT ((oa->o_valid & OBD_MD_FLID) != 0);
224 eco->eco_id = lsm->lsm_object_id = oa->o_id;
227 spin_lock (&ec->ec_lock);
229 eco2 = echo_find_object_locked (obd, oa->o_id);
230 if (eco2 != NULL) { /* conflict */
231 spin_unlock (&ec->ec_lock);
233 CERROR ("Can't create object id "LPX64": id already exists%s\n",
234 oa->o_id, on_target ? " (undoing create)" : "");
237 obd_destroy (&ec->ec_conn, oa, lsm, NULL);
243 list_add (&eco->eco_obj_chain, &ec->ec_objects);
244 spin_unlock (&ec->ec_lock);
246 "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
248 eco->eco_lsm->lsm_stripe_size,
249 eco->eco_lsm->lsm_stripe_count,
250 eco->eco_lsm->lsm_stripe_offset,
251 eco->eco_refcount, eco->eco_deleted);
255 echo_free_object (eco);
260 echo_get_object (struct ec_object **ecop, struct obd_device *obd,
263 struct echo_client_obd *ec = &obd->u.echo_client;
264 struct ec_object *eco;
265 struct ec_object *eco2;
268 if ((oa->o_valid & OBD_MD_FLID) == 0)
270 CERROR ("No valid oid\n");
274 spin_lock (&ec->ec_lock);
275 eco = echo_find_object_locked (obd, oa->o_id);
277 if (eco->eco_deleted) /* being deleted */
278 return (-EAGAIN); /* (see comment in cleanup) */
281 spin_unlock (&ec->ec_lock);
284 "found %p: "LPX64"=%u#%u&%d refs %d del %d\n",
286 eco->eco_lsm->lsm_stripe_size,
287 eco->eco_lsm->lsm_stripe_count,
288 eco->eco_lsm->lsm_stripe_offset,
289 eco->eco_refcount, eco->eco_deleted);
292 spin_unlock (&ec->ec_lock);
294 if (ec->ec_nstripes != 0) /* striping required */
297 eco = echo_allocate_object (obd);
301 eco->eco_id = eco->eco_lsm->lsm_object_id = oa->o_id;
303 spin_lock (&ec->ec_lock);
305 eco2 = echo_find_object_locked (obd, oa->o_id);
306 if (eco2 == NULL) { /* didn't race */
307 list_add (&eco->eco_obj_chain, &ec->ec_objects);
308 spin_unlock (&ec->ec_lock);
309 eco->eco_refcount = 1;
312 "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
314 eco->eco_lsm->lsm_stripe_size,
315 eco->eco_lsm->lsm_stripe_count,
316 eco->eco_lsm->lsm_stripe_offset,
317 eco->eco_refcount, eco->eco_deleted);
321 if (eco2->eco_deleted)
322 rc = -EAGAIN; /* lose race */
324 eco2->eco_refcount++; /* take existing */
327 LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id);
329 "found(2) %p: "LPX64"=%u#%u&%d refs %d del %d\n",
331 eco2->eco_lsm->lsm_stripe_size,
332 eco2->eco_lsm->lsm_stripe_count,
333 eco2->eco_lsm->lsm_stripe_offset,
334 eco2->eco_refcount, eco2->eco_deleted);
337 spin_unlock (&ec->ec_lock);
339 echo_free_object (eco);
344 echo_put_object (struct ec_object *eco)
346 struct obd_device *obd = eco->eco_device;
347 struct echo_client_obd *ec = &obd->u.echo_client;
349 /* Release caller's ref on the object.
350 * delete => mark for deletion when last ref goes
353 spin_lock (&ec->ec_lock);
356 LASSERT (eco->eco_refcount >= 0);
358 CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u&%d refs %d del %d\n",
360 eco->eco_lsm->lsm_stripe_size,
361 eco->eco_lsm->lsm_stripe_count,
362 eco->eco_lsm->lsm_stripe_offset,
363 eco->eco_refcount, eco->eco_deleted);
365 if (eco->eco_refcount != 0 || !eco->eco_deleted) {
366 spin_unlock (&ec->ec_lock);
370 spin_unlock (&ec->ec_lock);
372 /* NB leave obj in the object list. We must prevent anyone from
373 * attempting to enqueue on this object number until we can be
374 * sure there will be no more lock callbacks.
376 obd_cancel_unused(&ec->ec_conn, eco->eco_lsm, 0, NULL);
378 /* now we can let it go */
379 spin_lock (&ec->ec_lock);
380 list_del (&eco->eco_obj_chain);
381 spin_unlock (&ec->ec_lock);
383 LASSERT (eco->eco_refcount == 0);
385 echo_free_object (eco);
389 echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
391 unsigned long stripe_count;
392 unsigned long stripe_size;
394 unsigned long woffset;
398 if (lsm->lsm_stripe_count <= 1)
402 stripe_size = lsm->lsm_stripe_size;
403 stripe_count = lsm->lsm_stripe_count;
405 /* width = # bytes in all stripes */
406 width = stripe_size * stripe_count;
408 /* woffset = offset within a width; offset = whole number of widths */
409 woffset = do_div (offset, width);
411 stripe_index = woffset / stripe_size;
413 *idp = lsm->lsm_oinfo[stripe_index].loi_id;
414 *offp = offset * stripe_size + woffset % stripe_size;
418 echo_client_kbrw (struct obd_device *obd, int rw,
419 struct obdo *oa, struct lov_stripe_md *lsm,
420 obd_off offset, obd_size count)
422 struct echo_client_obd *ec = &obd->u.echo_client;
424 struct brw_page *pga;
425 struct brw_page *pgp;
432 /* oa_id == 0 => speed test (no verification) else...
433 * oa & 1 => use HIGHMEM
435 verify = (oa->o_id != 0);
436 gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
438 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
441 (count & (PAGE_SIZE - 1)) != 0 ||
443 lsm->lsm_object_id != oa->o_id))
446 /* XXX think again with misaligned I/O */
447 npages = count >> PAGE_SHIFT;
449 OBD_ALLOC(pga, npages * sizeof(*pga));
453 for (i = 0, pgp = pga, off = offset;
455 i++, pgp++, off += PAGE_SIZE) {
457 LASSERT (pgp->pg == NULL); /* for cleanup */
460 pgp->pg = alloc_pages (gfp_mask, 0);
464 pgp->count = PAGE_SIZE;
469 void *addr = kmap(pgp->pg);
470 obd_off stripe_off = off;
471 obd_id stripe_id = oa->o_id;
473 if (rw == OBD_BRW_WRITE) {
474 echo_get_stripe_off_id(lsm, &stripe_off,
476 page_debug_setup(addr, pgp->count,
477 stripe_off, stripe_id);
479 page_debug_setup(addr, pgp->count,
487 rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, NULL);
493 for (i = 0, pgp = pga; i < npages; i++, pgp++) {
498 void *addr = kmap(pgp->pg);
499 obd_off stripe_off = pgp->off;
500 obd_id stripe_id = oa->o_id;
503 echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
504 vrc = page_debug_check("test_brw", addr, pgp->count,
505 stripe_off, stripe_id);
506 if (vrc != 0 && rc == 0)
511 __free_pages(pgp->pg, 0);
513 OBD_FREE(pga, npages * sizeof(*pga));
518 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
519 static int echo_client_ubrw(struct obd_device *obd, int rw,
520 struct obdo *oa, struct lov_stripe_md *lsm,
521 obd_off offset, obd_size count, char *buffer)
523 struct echo_client_obd *ec = &obd->u.echo_client;
525 struct brw_page *pga;
526 struct brw_page *pgp;
528 struct kiobuf *kiobuf;
532 LASSERT (rw == OBD_BRW_WRITE ||
535 /* NB: for now, only whole pages, page aligned */
538 ((long)buffer & (PAGE_SIZE - 1)) != 0 ||
539 (count & (PAGE_SIZE - 1)) != 0 ||
540 (lsm != NULL && lsm->lsm_object_id != oa->o_id))
543 /* XXX think again with misaligned I/O */
544 npages = count >> PAGE_SHIFT;
546 OBD_ALLOC(pga, npages * sizeof(*pga));
550 rc = alloc_kiovec (1, &kiobuf);
554 rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE,
555 kiobuf, (unsigned long)buffer, count);
559 LASSERT (kiobuf->offset == 0);
560 LASSERT (kiobuf->nr_pages == npages);
562 for (i = 0, off = offset, pgp = pga;
564 i++, off += PAGE_SIZE, pgp++) {
566 pgp->pg = kiobuf->maplist[i];
567 pgp->count = PAGE_SIZE;
571 rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, NULL);
573 // if (rw == OBD_BRW_READ)
574 // mark_dirty_kiobuf (kiobuf, count);
576 unmap_kiobuf (kiobuf);
578 free_kiovec (1, &kiobuf);
580 OBD_FREE(pga, npages * sizeof(*pga));
584 static int echo_client_ubrw(struct obd_device *obd, int rw,
585 struct obdo *oa, struct lov_stripe_md *lsm,
586 obd_off offset, obd_size count, char *buffer)
595 echo_open (struct obd_export *exp, struct obdo *oa)
597 struct obd_device *obd = exp->exp_obd;
598 struct echo_client_obd *ec = &obd->u.echo_client;
599 struct lustre_handle *ufh = obdo_handle (oa);
600 struct ec_open_object *ecoo;
601 struct ec_object *eco;
604 rc = echo_get_object (&eco, obd, oa);
609 OBD_ALLOC (ecoo, sizeof (*ecoo));
613 rc = obd_open(&ec->ec_conn, oa, eco->eco_lsm, NULL, &ecoo->ecoo_och);
617 memcpy (&ecoo->ecoo_oa, oa, sizeof (*oa));
618 ecoo->ecoo_object = eco;
619 /* ecoo takes ref from echo_get_object() above */
621 spin_lock (&ec->ec_lock);
623 list_add (&ecoo->ecoo_exp_chain, &exp->exp_ec_data.eced_open_head);
624 ufh->cookie = ecoo->ecoo_cookie = ec->ec_unique++;
625 spin_unlock (&ec->ec_lock);
629 OBD_FREE (ecoo, sizeof (*ecoo));
631 echo_put_object (eco);
636 echo_close (struct obd_export *exp, struct obdo *oa)
638 struct obd_device *obd = exp->exp_obd;
639 struct echo_client_obd *ec = &obd->u.echo_client;
640 struct lustre_handle *ufh = obdo_handle (oa);
641 struct ec_open_object *ecoo = NULL;
643 struct list_head *el;
646 if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
649 spin_lock (&ec->ec_lock);
651 list_for_each (el, &exp->exp_ec_data.eced_open_head) {
652 ecoo = list_entry (el, struct ec_open_object, ecoo_exp_chain);
653 found = (ecoo->ecoo_cookie == ufh->cookie);
655 list_del (&ecoo->ecoo_exp_chain);
660 spin_unlock (&ec->ec_lock);
662 memcpy(&ecoo->ecoo_oa.o_inline, &ecoo->ecoo_och, FD_OSTDATA_SIZE);
663 ecoo->ecoo_oa.o_valid |= OBD_MD_FLHANDLE;
665 rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
666 ecoo->ecoo_object->eco_lsm, NULL);
668 echo_put_object (ecoo->ecoo_object);
669 OBD_FREE (ecoo, sizeof (*ecoo));
675 echo_ldlm_callback (struct ldlm_lock *lock, struct ldlm_lock_desc *new,
676 void *data, int flag)
678 struct ec_object *eco = (struct ec_object *)data;
679 struct echo_client_obd *ec = &(eco->eco_device->u.echo_client);
680 struct lustre_handle lockh;
681 struct list_head *el;
685 ldlm_lock2handle (lock, &lockh);
687 /* #ifdef this out if we're not feeling paranoid */
688 spin_lock (&ec->ec_lock);
689 list_for_each (el, &ec->ec_objects) {
690 found = (eco == list_entry(el, struct ec_object,
695 spin_unlock (&ec->ec_lock);
699 case LDLM_CB_BLOCKING:
700 CDEBUG(D_INFO, "blocking callback on "LPX64", handle "LPX64"\n",
701 eco->eco_id, lockh.cookie);
702 rc = ldlm_cli_cancel (&lockh);
704 CERROR ("ldlm_cli_cancel failed: %d\n", rc);
707 case LDLM_CB_CANCELING:
708 CDEBUG(D_INFO, "cancel callback on "LPX64", handle "LPX64"\n",
709 eco->eco_id, lockh.cookie);
720 echo_enqueue (struct obd_export *exp, struct obdo *oa,
721 int mode, obd_off offset, obd_size nob)
723 struct obd_device *obd = exp->exp_obd;
724 struct echo_client_obd *ec = &obd->u.echo_client;
725 struct lustre_handle *ulh = obdo_handle (oa);
726 struct ec_object *eco;
731 if (!(mode == LCK_PR || mode == LCK_PW))
734 if ((offset & (PAGE_SIZE - 1)) != 0 ||
735 (nob & (PAGE_SIZE - 1)) != 0)
738 rc = echo_get_object (&eco, obd, oa);
743 OBD_ALLOC (ecl, sizeof (*ecl));
747 ecl->ecl_mode = mode;
748 ecl->ecl_object = eco;
749 ecl->ecl_extent.start = offset;
750 ecl->ecl_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
753 rc = obd_enqueue(&ec->ec_conn, eco->eco_lsm, NULL, LDLM_EXTENT,
754 &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
755 &flags, echo_ldlm_callback, eco,
756 &ecl->ecl_lock_handle);
760 CDEBUG(D_INFO, "enqueue handle "LPX64"\n", ecl->ecl_lock_handle.cookie);
762 /* NB ecl takes object ref from echo_get_object() above */
763 spin_lock(&ec->ec_lock);
765 list_add(&ecl->ecl_exp_chain, &exp->exp_ec_data.eced_locks);
766 ulh->cookie = ecl->ecl_cookie = ec->ec_unique++;
768 spin_unlock(&ec->ec_lock);
770 oa->o_valid |= OBD_MD_FLHANDLE;
774 OBD_FREE (ecl, sizeof (*ecl));
776 echo_put_object (eco);
781 echo_cancel (struct obd_export *exp, struct obdo *oa)
783 struct obd_device *obd = exp->exp_obd;
784 struct echo_client_obd *ec = &obd->u.echo_client;
785 struct lustre_handle *ulh = obdo_handle (oa);
786 struct ec_lock *ecl = NULL;
788 struct list_head *el;
791 if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
794 spin_lock (&ec->ec_lock);
796 list_for_each (el, &exp->exp_ec_data.eced_locks) {
797 ecl = list_entry (el, struct ec_lock, ecl_exp_chain);
798 found = (ecl->ecl_cookie == ulh->cookie);
800 list_del (&ecl->ecl_exp_chain);
805 spin_unlock (&ec->ec_lock);
810 rc = obd_cancel(&ec->ec_conn, ecl->ecl_object->eco_lsm, ecl->ecl_mode,
811 &ecl->ecl_lock_handle);
813 echo_put_object (ecl->ecl_object);
814 OBD_FREE (ecl, sizeof (*ecl));
819 static int echo_iocontrol(unsigned int cmd, struct lustre_handle *obdconn,
820 int len, void *karg, void *uarg)
822 struct obd_export *exp = class_conn2export (obdconn);
823 struct obd_device *obd;
824 struct echo_client_obd *ec;
825 struct ec_object *eco;
826 struct obd_ioctl_data *data = karg;
827 int rw = OBD_BRW_READ;
832 CERROR("ioctl: No device\n");
833 GOTO(out, rc = -EINVAL);
837 ec = &obd->u.echo_client;
840 case OBD_IOC_CREATE: /* may create echo object */
841 if (!capable (CAP_SYS_ADMIN))
842 GOTO (out, rc = -EPERM);
844 rc = echo_create_object (obd, 1, &data->ioc_obdo1,
845 data->ioc_pbuf1, data->ioc_plen1);
848 case OBD_IOC_DESTROY:
849 if (!capable (CAP_SYS_ADMIN))
850 GOTO (out, rc = -EPERM);
852 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
854 rc = obd_destroy(&ec->ec_conn, &data->ioc_obdo1,
857 eco->eco_deleted = 1;
858 echo_put_object(eco);
862 case OBD_IOC_GETATTR:
863 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
865 rc = obd_getattr(&ec->ec_conn, &data->ioc_obdo1,
867 echo_put_object(eco);
871 case OBD_IOC_SETATTR:
872 if (!capable (CAP_SYS_ADMIN))
873 GOTO (out, rc = -EPERM);
875 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
877 rc = obd_setattr(&ec->ec_conn, &data->ioc_obdo1,
879 echo_put_object(eco);
884 rc = echo_open (exp, &data->ioc_obdo1);
888 rc = echo_close (exp, &data->ioc_obdo1);
891 case OBD_IOC_BRW_WRITE:
892 if (!capable (CAP_SYS_ADMIN))
893 GOTO (out, rc = -EPERM);
897 case OBD_IOC_BRW_READ:
898 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
900 if (data->ioc_pbuf2 == NULL) // NULL user data pointer
901 rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
907 rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
913 echo_put_object(eco);
917 case ECHO_IOC_GET_STRIPE:
918 rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
920 rc = echo_copyout_lsm(eco->eco_lsm, data->ioc_pbuf1,
922 echo_put_object(eco);
926 case ECHO_IOC_SET_STRIPE:
927 if (!capable (CAP_SYS_ADMIN))
928 GOTO (out, rc = -EPERM);
930 if (data->ioc_pbuf1 == NULL) { /* unset */
931 rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
933 eco->eco_deleted = 1;
934 echo_put_object(eco);
937 rc = echo_create_object(obd, 0, &data->ioc_obdo1,
943 case ECHO_IOC_ENQUEUE:
944 if (!capable (CAP_SYS_ADMIN))
945 GOTO (out, rc = -EPERM);
947 rc = echo_enqueue (exp, &data->ioc_obdo1,
948 data->ioc_conn1, /* lock mode */
949 data->ioc_offset, data->ioc_count);/*extent*/
952 case ECHO_IOC_CANCEL:
953 rc = echo_cancel (exp, &data->ioc_obdo1);
957 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
958 GOTO (out, rc = -ENOTTY);
963 class_export_put(exp);
967 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
969 struct obd_ioctl_data* data = buf;
970 struct echo_client_obd *ec = &obddev->u.echo_client;
971 struct obd_device *tgt;
972 struct obd_uuid uuid;
973 struct lov_stripe_md *lsm = NULL;
974 struct obd_uuid echo_uuid = { "ECHO_UUID" };
978 if (data->ioc_inllen1 < 1) {
979 CERROR("requires a TARGET OBD UUID\n");
982 if (data->ioc_inllen1 > 37) {
983 CERROR("OBD UUID must be less than 38 characters\n");
987 obd_str2uuid(&uuid, data->ioc_inlbuf1);
988 tgt = class_uuid2obd(&uuid);
989 if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
990 CERROR("device not attached or not set up (%d)\n",
992 RETURN(rc = -EINVAL);
995 spin_lock_init (&ec->ec_lock);
996 INIT_LIST_HEAD (&ec->ec_objects);
999 rc = obd_connect(&ec->ec_conn, tgt, &echo_uuid);
1001 CERROR("fail to connect to device %d\n", data->ioc_dev);
1005 ec->ec_lsmsize = obd_alloc_memmd (&ec->ec_conn, &lsm);
1006 if (ec->ec_lsmsize < 0) {
1007 CERROR ("Can't get # stripes: %d\n", rc);
1008 obd_disconnect (&ec->ec_conn, 0);
1009 rc = ec->ec_lsmsize;
1011 ec->ec_nstripes = lsm->lsm_stripe_count;
1012 obd_free_memmd (&ec->ec_conn, &lsm);
1018 static int echo_cleanup(struct obd_device * obddev, int force, int failover)
1020 struct list_head *el;
1021 struct ec_object *eco;
1022 struct echo_client_obd *ec = &obddev->u.echo_client;
1026 if (!list_empty(&obddev->obd_exports)) {
1027 CERROR("still has clients!\n");
1031 /* XXX assuming sole access */
1032 while (!list_empty (&ec->ec_objects)) {
1033 el = ec->ec_objects.next;
1034 eco = list_entry (el, struct ec_object, eco_obj_chain);
1036 LASSERT (eco->eco_refcount == 0);
1037 eco->eco_refcount = 1;
1038 eco->eco_deleted = 1;
1039 echo_put_object (eco);
1042 rc = obd_disconnect (&ec->ec_conn, 0);
1044 CERROR("fail to disconnect device: %d\n", rc);
1049 static int echo_connect(struct lustre_handle *conn, struct obd_device *src,
1050 struct obd_uuid *cluuid)
1052 struct obd_export *exp;
1055 rc = class_connect(conn, src, cluuid);
1057 exp = class_conn2export (conn);
1058 INIT_LIST_HEAD(&exp->exp_ec_data.eced_open_head);
1059 INIT_LIST_HEAD(&exp->exp_ec_data.eced_locks);
1060 class_export_put(exp);
1066 static int echo_disconnect(struct lustre_handle *conn, int failover)
1068 struct obd_export *exp = class_conn2export (conn);
1069 struct obd_device *obd;
1070 struct echo_client_obd *ec;
1071 struct ec_open_object *ecoo;
1072 struct ec_lock *ecl;
1076 GOTO(out, rc = -EINVAL);
1079 ec = &obd->u.echo_client;
1081 /* no more contention on export's lock list */
1082 while (!list_empty (&exp->exp_ec_data.eced_locks)) {
1083 ecl = list_entry (exp->exp_ec_data.eced_locks.next,
1084 struct ec_lock, ecl_exp_chain);
1085 list_del (&ecl->ecl_exp_chain);
1087 rc = obd_cancel (&ec->ec_conn, ecl->ecl_object->eco_lsm,
1088 ecl->ecl_mode, &ecl->ecl_lock_handle);
1090 CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect (%d)\n",
1091 ecl->ecl_object->eco_id, rc);
1093 echo_put_object (ecl->ecl_object);
1094 OBD_FREE (ecl, sizeof (*ecl));
1097 /* no more contention on export's open handle list */
1098 while (!list_empty (&exp->exp_ec_data.eced_open_head)) {
1099 ecoo = list_entry (exp->exp_ec_data.eced_open_head.next,
1100 struct ec_open_object, ecoo_exp_chain);
1101 list_del (&ecoo->ecoo_exp_chain);
1103 memcpy (&ecoo->ecoo_oa.o_inline, &ecoo->ecoo_och,
1105 ecoo->ecoo_oa.o_valid |= OBD_MD_FLHANDLE;
1107 rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
1108 ecoo->ecoo_object->eco_lsm, NULL);
1110 CDEBUG (D_INFO, "Closed object "LPX64" on disconnect (%d)\n",
1111 ecoo->ecoo_oa.o_id, rc);
1113 echo_put_object (ecoo->ecoo_object);
1114 OBD_FREE (ecoo, sizeof (*ecoo));
1117 rc = class_disconnect (conn, 0);
1120 class_export_put(exp);
1124 static struct obd_ops echo_obd_ops = {
1125 o_owner: THIS_MODULE,
1126 o_setup: echo_setup,
1127 o_cleanup: echo_cleanup,
1128 o_iocontrol: echo_iocontrol,
1129 o_connect: echo_connect,
1130 o_disconnect: echo_disconnect
1133 int echo_client_init(void)
1135 struct lprocfs_static_vars lvars;
1137 lprocfs_init_vars(&lvars);
1138 return class_register_type(&echo_obd_ops, lvars.module_vars,
1139 OBD_ECHO_CLIENT_DEVICENAME);
1142 void echo_client_cleanup(void)
1144 class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME);