*
* Copyright (c) 2001-2003 Cluster File Systems, Inc.
*
- * This file is part of Lustre, http://www.lustre.org.
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
*
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
*
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
*
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
*/
#define DEBUG_SUBSYSTEM S_ECHO
#ifdef __KERNEL__
-#include <linux/version.h>
-#include <linux/module.h>
-#include <linux/fs.h>
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-#include <linux/iobuf.h>
-#endif
-#include <asm/div64.h>
+#include <libcfs/libcfs.h>
#else
#include <liblustre.h>
#endif
-#include <linux/obd.h>
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/obd_echo.h>
-#include <linux/lustre_debug.h>
-#include <linux/lprocfs_status.h>
-#include <linux/lustre_lite.h> /* for LL_IOC_LOV_SETSTRIPE */
+#include <obd.h>
+#include <obd_support.h>
+#include <obd_class.h>
+#include <obd_echo.h>
+#include <lustre_debug.h>
+#include <lprocfs_status.h>
+
+static obd_id last_object_id;
#if 0
static void
struct lov_stripe_md *lsm = eco->eco_lsm;
int i;
- printk (KERN_INFO "%s: object %p: "LPX64", refs %d%s: "LPX64
- "=%u!%u@%d\n", msg, eco, eco->eco_id, eco->eco_refcount,
+ printk (KERN_INFO "Lustre: %s: object %p: "LPX64", refs %d%s: "LPX64
+ "=%u!%u\n", msg, eco, eco->eco_id, eco->eco_refcount,
eco->eco_deleted ? "(deleted) " : "",
lsm->lsm_object_id, lsm->lsm_stripe_size,
- lsm->lsm_stripe_count, lsm->lsm_stripe_offset);
+ lsm->lsm_stripe_count);
for (i = 0; i < lsm->lsm_stripe_count; i++)
- printk (KERN_INFO " [%2u]"LPX64"\n",
+ printk (KERN_INFO "Lustre: @%2u:"LPX64"\n",
lsm->lsm_oinfo[i].loi_ost_idx,
lsm->lsm_oinfo[i].loi_id);
}
}
static int
-echo_copyout_lsm (struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob)
+echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
{
- int nob;
+ struct lov_stripe_md *ulsm = _ulsm;
+ int nob, i;
nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
if (nob > ulsm_nob)
return (-EINVAL);
- if (copy_to_user (ulsm, lsm, nob))
+ if (copy_to_user (ulsm, lsm, sizeof(ulsm)))
return (-EFAULT);
- return (0);
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
+ sizeof(lsm->lsm_oinfo[0])))
+ return (-EFAULT);
+ }
+ return 0;
}
static int
void *ulsm, int ulsm_nob)
{
struct echo_client_obd *ec = &obd->u.echo_client;
- int nob;
+ int i;
if (ulsm_nob < sizeof (*lsm))
return (-EINVAL);
if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
return (-EFAULT);
- nob = lsm->lsm_stripe_count * sizeof (lsm->lsm_oinfo[0]);
-
- if (ulsm_nob < nob ||
- lsm->lsm_stripe_count > ec->ec_nstripes ||
+ if (lsm->lsm_stripe_count > ec->ec_nstripes ||
lsm->lsm_magic != LOV_MAGIC ||
- (lsm->lsm_stripe_offset != 0 &&
- lsm->lsm_stripe_offset != 0xffffffff &&
- lsm->lsm_stripe_offset >= ec->ec_nstripes) ||
- (lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 ||
+ (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
return (-EINVAL);
- LASSERT (ec->ec_lsmsize >= sizeof (*lsm) + nob);
-
- if (copy_from_user(lsm->lsm_oinfo,
- ((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob))
- return (-EFAULT);
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (copy_from_user(lsm->lsm_oinfo[i],
+ ((struct lov_stripe_md *)ulsm)->lsm_oinfo[i],
+ sizeof(lsm->lsm_oinfo[0])))
+ return (-EFAULT);
+ }
return (0);
}
{
struct echo_client_obd *ec = &obd->u.echo_client;
struct ec_object *eco;
+ int rc;
- OBD_ALLOC (eco, sizeof (*eco));
+ OBD_ALLOC(eco, sizeof (*eco));
if (eco == NULL)
- return (NULL);
+ return NULL;
- OBD_ALLOC (eco->eco_lsm, ec->ec_lsmsize);
- if (eco->eco_lsm == NULL) {
- OBD_FREE (eco, sizeof (*eco));
- return (NULL);
+ rc = obd_alloc_memmd(ec->ec_exp, &eco->eco_lsm);
+ if (rc < 0) {
+ OBD_FREE(eco, sizeof (*eco));
+ return NULL;
}
eco->eco_device = obd;
struct echo_client_obd *ec = &obd->u.echo_client;
LASSERT (eco->eco_refcount == 0);
- OBD_FREE (eco->eco_lsm, ec->ec_lsmsize);
+ if (!eco->eco_lsm)
+ CERROR("No object %s\n", obd->obd_name);
+ else
+ obd_free_memmd(ec->ec_exp, &eco->eco_lsm);
OBD_FREE (eco, sizeof (*eco));
}
-static int
-echo_create_object (struct obd_device *obd, int on_target, struct obdo *oa,
- void *ulsm, int ulsm_nob)
+static int echo_create_object(struct obd_device *obd, int on_target,
+ struct obdo *oa, void *ulsm, int ulsm_nob,
+ struct obd_trans_info *oti)
{
struct echo_client_obd *ec = &obd->u.echo_client;
struct ec_object *eco2;
struct ec_object *eco;
struct lov_stripe_md *lsm;
int rc;
- int i;
+ int i, idx;
if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
(on_target || /* set_stripe */
return (-EINVAL);
}
- eco = echo_allocate_object (obd);
- if (eco == NULL)
- return (-ENOMEM);
+ if (ulsm != NULL) {
+ eco = echo_allocate_object (obd);
+ if (eco == NULL)
+ return (-ENOMEM);
- lsm = eco->eco_lsm;
+ lsm = eco->eco_lsm;
- if (ulsm != NULL) {
rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob);
if (rc != 0)
goto failed;
- }
- /* setup object ID here for !on_target and LOV hint */
- if ((oa->o_valid & OBD_MD_FLID) != 0)
- eco->eco_id = lsm->lsm_object_id = oa->o_id;
+ /* setup object ID here for !on_target and LOV hint */
+ if ((oa->o_valid & OBD_MD_FLID) != 0)
+ eco->eco_id = lsm->lsm_object_id = oa->o_id;
- /* defaults -> actual values */
- if (lsm->lsm_stripe_offset == 0xffffffff)
- lsm->lsm_stripe_offset = 0;
+ if (lsm->lsm_stripe_count == 0)
+ lsm->lsm_stripe_count = ec->ec_nstripes;
- if (lsm->lsm_stripe_count == 0)
- lsm->lsm_stripe_count = ec->ec_nstripes;
+ if (lsm->lsm_stripe_size == 0)
+ lsm->lsm_stripe_size = CFS_PAGE_SIZE;
- if (lsm->lsm_stripe_size == 0)
- lsm->lsm_stripe_size = PAGE_SIZE;
+ idx = ll_rand();
- /* setup stripes: indices + default ids if required */
- for (i = 0; i < lsm->lsm_stripe_count; i++) {
- if (lsm->lsm_oinfo[i].loi_id == 0)
- lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
+ /* setup stripes: indices + default ids if required */
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (lsm->lsm_oinfo[i]->loi_id == 0)
+ lsm->lsm_oinfo[i]->loi_id = lsm->lsm_object_id;
- lsm->lsm_oinfo[i].loi_ost_idx =
- (lsm->lsm_stripe_offset + i) % ec->ec_nstripes;
+ lsm->lsm_oinfo[i]->loi_ost_idx =
+ (idx + i) % ec->ec_nstripes;
+ }
+ } else {
+ OBD_ALLOC(eco, sizeof(*eco));
+ if (!eco)
+ return (-ENOMEM);
+ eco->eco_device = obd;
+ lsm = NULL;
}
+ if (oa->o_id == 0)
+ oa->o_id = ++last_object_id;
+
if (on_target) {
- rc = obd_create (&ec->ec_conn, oa, &lsm, NULL);
+ oa->o_gr = FILTER_GROUP_ECHO;
+ oa->o_valid |= OBD_MD_FLGROUP;
+
+ rc = obd_create(ec->ec_exp, oa, &lsm, oti);
if (rc != 0)
goto failed;
/* See what object ID we were given */
- LASSERT ((oa->o_valid & OBD_MD_FLID) != 0);
- eco->eco_id = lsm->lsm_object_id = oa->o_id;
+ eco->eco_id = oa->o_id = lsm->lsm_object_id;
+ oa->o_valid |= OBD_MD_FLID;
+
+ LASSERT(eco->eco_lsm == NULL || eco->eco_lsm == lsm);
+ eco->eco_lsm = lsm;
}
spin_lock (&ec->ec_lock);
oa->o_id, on_target ? " (undoing create)" : "");
if (on_target)
- obd_destroy (&ec->ec_conn, oa, lsm, NULL);
+ obd_destroy(ec->ec_exp, oa, lsm, oti, NULL);
rc = -EEXIST;
goto failed;
list_add (&eco->eco_obj_chain, &ec->ec_objects);
spin_unlock (&ec->ec_lock);
CDEBUG (D_INFO,
- "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ "created %p: "LPX64"=%u#%u@%u refs %d del %d\n",
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_stripe_offset,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
failed:
echo_free_object (eco);
+ if (rc)
+ CERROR("%s: err %d on create\n", obd->obd_name, rc);
return (rc);
}
struct ec_object *eco2;
int rc;
- if ((oa->o_valid & OBD_MD_FLID) == 0)
+ if ((oa->o_valid & OBD_MD_FLID) == 0 ||
+ oa->o_id == 0) /* disallow use of object id 0 */
{
CERROR ("No valid oid\n");
return (-EINVAL);
spin_lock (&ec->ec_lock);
eco = echo_find_object_locked (obd, oa->o_id);
if (eco != NULL) {
- if (eco->eco_deleted) /* being deleted */
- return (-EAGAIN); /* (see comment in cleanup) */
+ if (eco->eco_deleted) { /* being deleted */
+ spin_unlock(&ec->ec_lock);/* (see comment in cleanup) */
+ return (-EAGAIN);
+ }
eco->eco_refcount++;
spin_unlock (&ec->ec_lock);
*ecop = eco;
CDEBUG (D_INFO,
- "found %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ "found %p: "LPX64"=%u#%u@%u refs %d del %d\n",
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_stripe_offset,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
}
eco->eco_refcount = 1;
*ecop = eco;
CDEBUG (D_INFO,
- "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ "created %p: "LPX64"=%u#%u@%d refs %d del %d\n",
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_stripe_offset,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
}
rc = 0;
LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id);
CDEBUG (D_INFO,
- "found(2) %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ "found(2) %p: "LPX64"=%u#%u@%d refs %d del %d\n",
eco2, eco2->eco_id,
eco2->eco_lsm->lsm_stripe_size,
eco2->eco_lsm->lsm_stripe_count,
- eco2->eco_lsm->lsm_stripe_offset,
+ eco2->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco2->eco_refcount, eco2->eco_deleted);
}
eco->eco_refcount--;
LASSERT (eco->eco_refcount >= 0);
- CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u@%d refs %d del %d\n",
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_stripe_offset,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
if (eco->eco_refcount != 0 || !eco->eco_deleted) {
* attempting to enqueue on this object number until we can be
* sure there will be no more lock callbacks.
*/
- obd_cancel_unused(&ec->ec_conn, eco->eco_lsm, 0, NULL);
+ obd_cancel_unused(ec->ec_exp, eco->eco_lsm, 0, NULL);
/* now we can let it go */
spin_lock (&ec->ec_lock);
stripe_index = woffset / stripe_size;
- *idp = lsm->lsm_oinfo[stripe_index].loi_id;
+ *idp = lsm->lsm_oinfo[stripe_index]->loi_id;
*offp = offset * stripe_size + woffset % stripe_size;
}
-static int
-echo_client_kbrw (struct obd_device *obd, int rw,
- struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count)
+static void
+echo_client_page_debug_setup(struct lov_stripe_md *lsm,
+ cfs_page_t *page, int rw, obd_id id,
+ obd_off offset, obd_off count)
+{
+ char *addr;
+ obd_off stripe_off;
+ obd_id stripe_id;
+ int delta;
+
+ /* no partial pages on the client */
+ LASSERT(count == CFS_PAGE_SIZE);
+
+ addr = cfs_kmap(page);
+
+ for (delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+ if (rw == OBD_BRW_WRITE) {
+ stripe_off = offset + delta;
+ stripe_id = id;
+ echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
+ } else {
+ stripe_off = 0xdeadbeef00c0ffeeULL;
+ stripe_id = 0xdeadbeef00c0ffeeULL;
+ }
+ block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
+ stripe_off, stripe_id);
+ }
+
+ cfs_kunmap(page);
+}
+
+static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
+ cfs_page_t *page, obd_id id,
+ obd_off offset, obd_off count)
+{
+ obd_off stripe_off;
+ obd_id stripe_id;
+ char *addr;
+ int delta;
+ int rc;
+ int rc2;
+
+ /* no partial pages on the client */
+ LASSERT(count == CFS_PAGE_SIZE);
+
+ addr = cfs_kmap(page);
+
+ for (rc = delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+ stripe_off = offset + delta;
+ stripe_id = id;
+ echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
+
+ rc2 = block_debug_check("test_brw",
+ addr + delta, OBD_ECHO_BLOCK_SIZE,
+ stripe_off, stripe_id);
+ if (rc2 != 0) {
+ CERROR ("Error in echo object "LPX64"\n", id);
+ rc = rc2;
+ }
+ }
+
+ cfs_kunmap(page);
+ return rc;
+}
+
+static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
+ struct lov_stripe_md *lsm, obd_off offset,
+ obd_size count, struct obd_trans_info *oti)
{
struct echo_client_obd *ec = &obd->u.echo_client;
+ struct obd_info oinfo = { { { 0 } } };
obd_count npages;
struct brw_page *pga;
struct brw_page *pgp;
int verify;
int gfp_mask;
- /* oa_id == 0 => speed test (no verification) else...
- * oa & 1 => use HIGHMEM
- */
- verify = (oa->o_id != 0);
- gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+ verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+ gfp_mask = ((oa->o_id & 2) == 0) ? CFS_ALLOC_STD : CFS_ALLOC_HIGHUSER;
LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
+ LASSERT(lsm != NULL);
+ LASSERT(lsm->lsm_object_id == oa->o_id);
if (count <= 0 ||
- (count & (PAGE_SIZE - 1)) != 0 ||
- (lsm != NULL &&
- lsm->lsm_object_id != oa->o_id))
+ (count & (~CFS_PAGE_MASK)) != 0)
return (-EINVAL);
/* XXX think again with misaligned I/O */
- npages = count >> PAGE_SHIFT;
+ npages = count >> CFS_PAGE_SHIFT;
OBD_ALLOC(pga, npages * sizeof(*pga));
if (pga == NULL)
for (i = 0, pgp = pga, off = offset;
i < npages;
- i++, pgp++, off += PAGE_SIZE) {
+ i++, pgp++, off += CFS_PAGE_SIZE) {
LASSERT (pgp->pg == NULL); /* for cleanup */
rc = -ENOMEM;
- pgp->pg = alloc_pages (gfp_mask, 0);
+ OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
if (pgp->pg == NULL)
goto out;
- pgp->count = PAGE_SIZE;
+ pgp->count = CFS_PAGE_SIZE;
pgp->off = off;
pgp->flag = 0;
- if (verify) {
- void *addr = kmap(pgp->pg);
- obd_off stripe_off = off;
- obd_id stripe_id = oa->o_id;
-
- if (rw == OBD_BRW_WRITE) {
- echo_get_stripe_off_id(lsm, &stripe_off,
- &stripe_id);
- page_debug_setup(addr, pgp->count,
- stripe_off, stripe_id);
- } else {
- page_debug_setup(addr, pgp->count,
- 0xdeadbeef00c0ffee,
- 0xdeadbeef00c0ffee);
- }
- kunmap(pgp->pg);
- }
+ if (verify)
+ echo_client_page_debug_setup(lsm, pgp->pg, rw,
+ oa->o_id, off, pgp->count);
}
- rc = obd_brw(rw, &ec->ec_conn, oa, lsm, npages, pga, NULL);
+ oinfo.oi_oa = oa;
+ oinfo.oi_md = lsm;
+ rc = obd_brw(rw, ec->ec_exp, &oinfo, npages, pga, oti);
out:
- if (rc != 0)
+ if (rc != 0 || rw != OBD_BRW_READ)
verify = 0;
for (i = 0, pgp = pga; i < npages; i++, pgp++) {
continue;
if (verify) {
- void *addr = kmap(pgp->pg);
- obd_off stripe_off = pgp->off;
- obd_id stripe_id = oa->o_id;
- int vrc;
-
- echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
- vrc = page_debug_check("test_brw", addr, pgp->count,
- stripe_off, stripe_id);
+ int vrc;
+ vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,
+ pgp->off, pgp->count);
if (vrc != 0 && rc == 0)
rc = vrc;
-
- kunmap(pgp->pg);
}
- __free_pages(pgp->pg, 0);
+ OBD_PAGE_FREE(pgp->pg);
}
OBD_FREE(pga, npages * sizeof(*pga));
return (rc);
}
#ifdef __KERNEL__
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
static int echo_client_ubrw(struct obd_device *obd, int rw,
struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count, char *buffer)
+ obd_off offset, obd_size count, char *buffer,
+ struct obd_trans_info *oti)
{
- struct echo_client_obd *ec = &obd->u.echo_client;
- obd_count npages;
- struct brw_page *pga;
- struct brw_page *pgp;
- obd_off off;
- struct kiobuf *kiobuf;
- int i;
- int rc;
+#warning "echo_client_ubrw() needs to be ported on 2.6 yet"
+ LBUG();
+ return 0;
+}
+#endif
+
+struct echo_async_state;
+
+#define EAP_MAGIC 79277927
+struct echo_async_page {
+ int eap_magic;
+ cfs_page_t *eap_page;
+ void *eap_cookie;
+ obd_off eap_off;
+ struct echo_async_state *eap_eas;
+ struct list_head eap_item;
+};
+
+#define EAP_FROM_COOKIE(c) \
+ (LASSERT(((struct echo_async_page *)(c))->eap_magic == EAP_MAGIC), \
+ (struct echo_async_page *)(c))
+
+struct echo_async_state {
+ spinlock_t eas_lock;
+ obd_off eas_next_offset;
+ obd_off eas_end_offset;
+ int eas_in_flight;
+ int eas_rc;
+ cfs_waitq_t eas_waitq;
+ struct list_head eas_avail;
+ struct obdo eas_oa;
+ struct lov_stripe_md *eas_lsm;
+};
+
+static int eas_should_wake(struct echo_async_state *eas)
+{
+ int rc = 0;
- LASSERT (rw == OBD_BRW_WRITE ||
- rw == OBD_BRW_READ);
+ spin_lock(&eas->eas_lock);
+ if (eas->eas_rc == 0 && !list_empty(&eas->eas_avail))
+ rc = 1;
+ spin_unlock(&eas->eas_lock);
+ return rc;
+};
- /* NB: for now, only whole pages, page aligned */
+static int ec_ap_make_ready(void *data, int cmd)
+{
+ /* our pages are issued ready */
+ LBUG();
+ return 0;
+}
+static int ec_ap_refresh_count(void *data, int cmd)
+{
+ /* our pages are issued with a stable count */
+ LBUG();
+ return CFS_PAGE_SIZE;
+}
+static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
+{
+ struct echo_async_page *eap = EAP_FROM_COOKIE(data);
+
+ memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa));
+}
+
+static int ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
+{
+ struct echo_async_page *eap = EAP_FROM_COOKIE(data);
+ struct echo_async_state *eas;
+
+ eas = eap->eap_eas;
+
+ if (cmd == OBD_BRW_READ &&
+ eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID &&
+ (eas->eas_oa.o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (eas->eas_oa.o_flags & OBD_FL_DEBUG_CHECK) != 0)
+ echo_client_page_debug_check(eas->eas_lsm, eap->eap_page,
+ eas->eas_oa.o_id, eap->eap_off,
+ CFS_PAGE_SIZE);
+
+ spin_lock(&eas->eas_lock);
+ if (rc && !eas->eas_rc)
+ eas->eas_rc = rc;
+ eas->eas_in_flight--;
+ list_add(&eap->eap_item, &eas->eas_avail);
+ cfs_waitq_signal(&eas->eas_waitq);
+ spin_unlock(&eas->eas_lock);
+ return 0;
+}
+
+static struct obd_async_page_ops ec_async_page_ops = {
+ .ap_make_ready = ec_ap_make_ready,
+ .ap_refresh_count = ec_ap_refresh_count,
+ .ap_fill_obdo = ec_ap_fill_obdo,
+ .ap_completion = ec_ap_completion,
+};
+
+static int echo_client_async_page(struct obd_export *exp, int rw,
+ struct obdo *oa, struct lov_stripe_md *lsm,
+ obd_off offset, obd_size count,
+ obd_size batching)
+{
+ obd_count npages, i;
+ struct echo_async_page *eap;
+ struct echo_async_state eas;
+ int rc = 0;
+ struct echo_async_page **aps = NULL;
+
+ ENTRY;
+#if 0
+ int verify;
+ int gfp_mask;
+
+ verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+ gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+#endif
+
+ LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
if (count <= 0 ||
- ((long)buffer & (PAGE_SIZE - 1)) != 0 ||
- (count & (PAGE_SIZE - 1)) != 0 ||
- (lsm != NULL && lsm->lsm_object_id != oa->o_id))
+ (count & (~CFS_PAGE_MASK)) != 0 ||
+ (lsm != NULL &&
+ lsm->lsm_object_id != oa->o_id))
return (-EINVAL);
/* XXX think again with misaligned I/O */
- npages = count >> PAGE_SHIFT;
-
- OBD_ALLOC(pga, npages * sizeof(*pga));
- if (pga == NULL)
+ npages = batching >> CFS_PAGE_SHIFT;
+
+ memcpy(&eas.eas_oa, oa, sizeof(*oa));
+ eas.eas_next_offset = offset;
+ eas.eas_end_offset = offset + count;
+ spin_lock_init(&eas.eas_lock);
+ cfs_waitq_init(&eas.eas_waitq);
+ eas.eas_in_flight = 0;
+ eas.eas_rc = 0;
+ eas.eas_lsm = lsm;
+ CFS_INIT_LIST_HEAD(&eas.eas_avail);
+
+ OBD_ALLOC(aps, npages * sizeof aps[0]);
+ if (aps == NULL)
return (-ENOMEM);
- rc = alloc_kiovec (1, &kiobuf);
- if (rc != 0)
- goto out_1;
+ /* prepare the group of pages that we're going to be keeping
+ * in flight */
+ for (i = 0; i < npages; i++) {
+ cfs_page_t *page;
+ OBD_PAGE_ALLOC(page, CFS_ALLOC_STD);
+ if (page == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ OBD_ALLOC(eap, sizeof(*eap));
+ if (eap == NULL) {
+ OBD_PAGE_FREE(page);
+ GOTO(out, rc = -ENOMEM);
+ }
- rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE,
- kiobuf, (unsigned long)buffer, count);
- if (rc != 0)
- goto out_2;
+ eap->eap_magic = EAP_MAGIC;
+ eap->eap_page = page;
+ eap->eap_eas = &eas;
+ list_add_tail(&eap->eap_item, &eas.eas_avail);
+ aps[i] = eap;
+ }
- LASSERT (kiobuf->offset == 0);
- LASSERT (kiobuf->nr_pages == npages);
+ /* first we spin queueing io and being woken by its completion */
+ spin_lock(&eas.eas_lock);
+ for(;;) {
+ int rc;
+
+ /* sleep until we have a page to send */
+ spin_unlock(&eas.eas_lock);
+ rc = wait_event_interruptible(eas.eas_waitq,
+ eas_should_wake(&eas));
+ spin_lock(&eas.eas_lock);
+ if (rc && !eas.eas_rc)
+ eas.eas_rc = rc;
+ if (eas.eas_rc)
+ break;
+ if (list_empty(&eas.eas_avail))
+ continue;
+ eap = list_entry(eas.eas_avail.next, struct echo_async_page,
+ eap_item);
+ list_del(&eap->eap_item);
+ spin_unlock(&eas.eas_lock);
+
+ /* unbind the eap from its old page offset */
+ if (eap->eap_cookie != NULL) {
+ obd_teardown_async_page(exp, lsm, NULL,
+ eap->eap_cookie);
+ eap->eap_cookie = NULL;
+ }
- for (i = 0, off = offset, pgp = pga;
- i < npages;
- i++, off += PAGE_SIZE, pgp++) {
- pgp->off = off;
- pgp->pg = kiobuf->maplist[i];
- pgp->count = PAGE_SIZE;
- pgp->flag = 0;
+ eas.eas_next_offset += CFS_PAGE_SIZE;
+ eap->eap_off = eas.eas_next_offset;
+
+ rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
+ eap->eap_off, &ec_async_page_ops,
+ eap, &eap->eap_cookie);
+ if (rc) {
+ spin_lock(&eas.eas_lock);
+ eas.eas_rc = rc;
+ break;
+ }
+
+ if (oa->o_id != ECHO_PERSISTENT_OBJID &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0)
+ echo_client_page_debug_setup(lsm, eap->eap_page, rw,
+ oa->o_id,
+ eap->eap_off, CFS_PAGE_SIZE);
+
+ /* always asserts urgent, which isn't quite right */
+ rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie,
+ rw, 0, CFS_PAGE_SIZE, 0,
+ ASYNC_READY | ASYNC_URGENT |
+ ASYNC_COUNT_STABLE);
+ spin_lock(&eas.eas_lock);
+ if (rc && !eas.eas_rc) {
+ eas.eas_rc = rc;
+ break;
+ }
+ eas.eas_in_flight++;
+ if (eas.eas_next_offset == eas.eas_end_offset)
+ break;
}
- rc = obd_brw(rw, &ec->ec_conn, oa, lsm, npages, pga, NULL);
+ /* still hold the eas_lock here.. */
- // if (rw == OBD_BRW_READ)
- // mark_dirty_kiobuf (kiobuf, count);
+ /* now we just spin waiting for all the rpcs to complete */
+ while(eas.eas_in_flight) {
+ spin_unlock(&eas.eas_lock);
+ wait_event_interruptible(eas.eas_waitq,
+ eas.eas_in_flight == 0);
+ spin_lock(&eas.eas_lock);
+ }
+ spin_unlock(&eas.eas_lock);
+
+out:
+ if (aps != NULL) {
+ for (i = 0; i < npages; ++ i) {
+ cfs_page_t *page;
+
+ eap = aps[i];
+ page = eap->eap_page;
+ if (eap->eap_cookie != NULL)
+ obd_teardown_async_page(exp, lsm, NULL,
+ eap->eap_cookie);
+ OBD_FREE(eap, sizeof(*eap));
+ OBD_PAGE_FREE(page);
+ }
+ OBD_FREE(aps, npages * sizeof aps[0]);
+ }
- unmap_kiobuf (kiobuf);
- out_2:
- free_kiovec (1, &kiobuf);
- out_1:
- OBD_FREE(pga, npages * sizeof(*pga));
- return (rc);
-}
-#else
-static int echo_client_ubrw(struct obd_device *obd, int rw,
- struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count, char *buffer)
-{
- LBUG();
- return 0;
+ RETURN(rc);
}
-#endif
-#endif
-static int
-echo_open (struct obd_export *exp, struct obdo *oa)
+static int echo_client_prep_commit(struct obd_export *exp, int rw,
+ struct obdo *oa, struct lov_stripe_md *lsm,
+ obd_off offset, obd_size count,
+ obd_size batch, struct obd_trans_info *oti)
{
- struct obd_device *obd = exp->exp_obd;
- struct echo_client_obd *ec = &obd->u.echo_client;
- struct lustre_handle *ufh = obdo_handle (oa);
- struct ec_open_object *ecoo;
- struct ec_object *eco;
- int rc;
+ struct obd_ioobj ioo;
+ struct niobuf_local *lnb;
+ struct niobuf_remote *rnb;
+ obd_off off;
+ obd_size npages, tot_pages;
+ int i, ret = 0;
+ ENTRY;
- rc = echo_get_object (&eco, obd, oa);
- if (rc != 0)
- return rc;
+ if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
+ (lsm != NULL && lsm->lsm_object_id != oa->o_id))
+ RETURN(-EINVAL);
- rc = -ENOMEM;
- OBD_ALLOC (ecoo, sizeof (*ecoo));
- if (ecoo == NULL)
- goto failed_0;
+ npages = batch >> CFS_PAGE_SHIFT;
+ tot_pages = count >> CFS_PAGE_SHIFT;
- rc = obd_open(&ec->ec_conn, oa, eco->eco_lsm, NULL, &ecoo->ecoo_och);
- if (rc != 0)
- goto failed_1;
+ OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
+ OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
- memcpy (&ecoo->ecoo_oa, oa, sizeof (*oa));
- ecoo->ecoo_object = eco;
- /* ecoo takes ref from echo_get_object() above */
+ if (lnb == NULL || rnb == NULL)
+ GOTO(out, ret = -ENOMEM);
- spin_lock (&ec->ec_lock);
+ obdo_to_ioobj(oa, &ioo);
- list_add (&ecoo->ecoo_exp_chain, &exp->exp_ec_data.eced_open_head);
- ufh->cookie = ecoo->ecoo_cookie = ec->ec_unique++;
- spin_unlock (&ec->ec_lock);
- return 0;
+ off = offset;
- failed_1:
- OBD_FREE (ecoo, sizeof (*ecoo));
- failed_0:
- echo_put_object (eco);
- return (rc);
-}
+ for(; tot_pages; tot_pages -= npages) {
+ if (tot_pages < npages)
+ npages = tot_pages;
-static int
-echo_close (struct obd_export *exp, struct obdo *oa)
-{
- struct obd_device *obd = exp->exp_obd;
- struct echo_client_obd *ec = &obd->u.echo_client;
- struct lustre_handle *ufh = obdo_handle (oa);
- struct ec_open_object *ecoo = NULL;
- int found = 0;
- struct list_head *el;
- int rc;
+ for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) {
+ rnb[i].offset = off;
+ rnb[i].len = CFS_PAGE_SIZE;
+ }
- if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
- return -EINVAL;
+ ioo.ioo_bufcnt = npages;
+ oti->oti_transno = 0;
- spin_lock (&ec->ec_lock);
+ ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti,
+ NULL);
+ if (ret != 0)
+ GOTO(out, ret);
- list_for_each (el, &exp->exp_ec_data.eced_open_head) {
- ecoo = list_entry (el, struct ec_open_object, ecoo_exp_chain);
- found = (ecoo->ecoo_cookie == ufh->cookie);
- if (found) {
- list_del (&ecoo->ecoo_exp_chain);
- break;
+ for (i = 0; i < npages; i++) {
+ cfs_page_t *page = lnb[i].page;
+
+ /* read past eof? */
+ if (page == NULL && lnb[i].rc == 0)
+ continue;
+
+ if (oa->o_id == ECHO_PERSISTENT_OBJID ||
+ (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
+ continue;
+
+ if (rw == OBD_BRW_WRITE)
+ echo_client_page_debug_setup(lsm, page, rw,
+ oa->o_id,
+ rnb[i].offset,
+ rnb[i].len);
+ else
+ echo_client_page_debug_check(lsm, page,
+ oa->o_id,
+ rnb[i].offset,
+ rnb[i].len);
}
+
+ ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
+ if (ret != 0)
+ GOTO(out, ret);
}
- spin_unlock (&ec->ec_lock);
+out:
+ if (lnb)
+ OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
+ if (rnb)
+ OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
+ RETURN(ret);
+}
- memcpy(&ecoo->ecoo_oa.o_inline, &ecoo->ecoo_och, FD_OSTDATA_SIZE);
- ecoo->ecoo_oa.o_valid |= OBD_MD_FLHANDLE;
+int echo_client_brw_ioctl(int rw, struct obd_export *exp,
+ struct obd_ioctl_data *data)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct echo_client_obd *ec = &obd->u.echo_client;
+ struct obd_trans_info dummy_oti = { .oti_thread_id = -1 };
+ struct ec_object *eco;
+ int rc;
+ ENTRY;
- rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
- ecoo->ecoo_object->eco_lsm, NULL);
+ rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
+ if (rc)
+ RETURN(rc);
- echo_put_object (ecoo->ecoo_object);
- OBD_FREE (ecoo, sizeof (*ecoo));
+ data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE;
+ data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP;
+ data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO;
- return (rc);
+ switch((long)data->ioc_pbuf1) {
+ case 1:
+ if (data->ioc_pbuf2 == NULL) { // NULL user data pointer
+ rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
+ eco->eco_lsm, data->ioc_offset,
+ data->ioc_count, &dummy_oti);
+ } else {
+#ifdef __KERNEL__
+ rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
+ eco->eco_lsm, data->ioc_offset,
+ data->ioc_count, data->ioc_pbuf2,
+ &dummy_oti);
+#endif
+ }
+ break;
+ case 2:
+ rc = echo_client_async_page(ec->ec_exp, rw, &data->ioc_obdo1,
+ eco->eco_lsm, data->ioc_offset,
+ data->ioc_count, data->ioc_plen1);
+ break;
+ case 3:
+ rc = echo_client_prep_commit(ec->ec_exp, rw, &data->ioc_obdo1,
+ eco->eco_lsm, data->ioc_offset,
+ data->ioc_count, data->ioc_plen1,
+ &dummy_oti);
+ break;
+ default:
+ rc = -EINVAL;
+ }
+ echo_put_object(eco);
+ RETURN(rc);
}
static int
}
static int
-echo_enqueue (struct obd_export *exp, struct obdo *oa,
- int mode, obd_off offset, obd_size nob)
+echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
+ int mode, obd_off offset, obd_size nob)
{
struct obd_device *obd = exp->exp_obd;
struct echo_client_obd *ec = &obd->u.echo_client;
struct lustre_handle *ulh = obdo_handle (oa);
+ struct ldlm_enqueue_info einfo = { 0 };
+ struct obd_info oinfo = { { { 0 } } };
struct ec_object *eco;
struct ec_lock *ecl;
- int flags;
int rc;
if (!(mode == LCK_PR || mode == LCK_PW))
return -EINVAL;
- if ((offset & (PAGE_SIZE - 1)) != 0 ||
- (nob & (PAGE_SIZE - 1)) != 0)
+ if ((offset & (~CFS_PAGE_MASK)) != 0 ||
+ (nob & (~CFS_PAGE_MASK)) != 0)
return -EINVAL;
rc = echo_get_object (&eco, obd, oa);
ecl->ecl_mode = mode;
ecl->ecl_object = eco;
- ecl->ecl_extent.start = offset;
- ecl->ecl_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
-
- flags = 0;
- rc = obd_enqueue(&ec->ec_conn, eco->eco_lsm, NULL, LDLM_EXTENT,
- &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
- &flags, echo_ldlm_callback, eco,
- &ecl->ecl_lock_handle);
+ ecl->ecl_policy.l_extent.start = offset;
+ ecl->ecl_policy.l_extent.end =
+ (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
+
+ einfo.ei_type = LDLM_EXTENT;
+ einfo.ei_mode = mode;
+ einfo.ei_cb_bl = echo_ldlm_callback;
+ einfo.ei_cb_cp = ldlm_completion_ast;
+ einfo.ei_cb_gl = NULL;
+ einfo.ei_cbdata = eco;
+
+ oinfo.oi_policy = ecl->ecl_policy;
+ oinfo.oi_lockh = &ecl->ecl_lock_handle;
+ oinfo.oi_md = eco->eco_lsm;
+ rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo, NULL);
if (rc != 0)
goto failed_1;
}
static int
-echo_cancel (struct obd_export *exp, struct obdo *oa)
+echo_client_cancel(struct obd_export *exp, struct obdo *oa)
{
struct obd_device *obd = exp->exp_obd;
struct echo_client_obd *ec = &obd->u.echo_client;
if (!found)
return (-ENOENT);
- rc = obd_cancel(&ec->ec_conn, ecl->ecl_object->eco_lsm, ecl->ecl_mode,
+ rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm, ecl->ecl_mode,
&ecl->ecl_lock_handle);
echo_put_object (ecl->ecl_object);
return rc;
}
-static int echo_iocontrol(unsigned int cmd, struct lustre_handle *obdconn,
- int len, void *karg, void *uarg)
+static int
+echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
+ int len, void *karg, void *uarg)
{
- struct obd_export *exp = class_conn2export (obdconn);
struct obd_device *obd;
struct echo_client_obd *ec;
struct ec_object *eco;
struct obd_ioctl_data *data = karg;
+ struct obd_trans_info dummy_oti;
+ struct oti_req_ack_lock *ack_lock;
+ struct obdo *oa;
int rw = OBD_BRW_READ;
int rc = 0;
+ int i;
ENTRY;
- if (exp == NULL) {
- CERROR("ioctl: No device\n");
- GOTO(out, rc = -EINVAL);
- }
+ unlock_kernel();
+
+ memset(&dummy_oti, 0, sizeof(dummy_oti));
obd = exp->exp_obd;
ec = &obd->u.echo_client;
GOTO (out, rc = -EPERM);
rc = echo_create_object (obd, 1, &data->ioc_obdo1,
- data->ioc_pbuf1, data->ioc_plen1);
+ data->ioc_pbuf1, data->ioc_plen1,
+ &dummy_oti);
GOTO(out, rc);
case OBD_IOC_DESTROY:
rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
if (rc == 0) {
- rc = obd_destroy(&ec->ec_conn, &data->ioc_obdo1,
- eco->eco_lsm, NULL);
+ oa = &data->ioc_obdo1;
+ oa->o_gr = FILTER_GROUP_ECHO;
+ oa->o_valid |= OBD_MD_FLGROUP;
+ rc = obd_destroy(ec->ec_exp, oa, eco->eco_lsm,
+ &dummy_oti, NULL);
if (rc == 0)
eco->eco_deleted = 1;
echo_put_object(eco);
case OBD_IOC_GETATTR:
rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
if (rc == 0) {
- rc = obd_getattr(&ec->ec_conn, &data->ioc_obdo1,
- eco->eco_lsm);
+ struct obd_info oinfo = { { { 0 } } };
+ oinfo.oi_md = eco->eco_lsm;
+ oinfo.oi_oa = &data->ioc_obdo1;
+ rc = obd_getattr(ec->ec_exp, &oinfo);
echo_put_object(eco);
}
GOTO(out, rc);
rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
if (rc == 0) {
- rc = obd_setattr(&ec->ec_conn, &data->ioc_obdo1,
- eco->eco_lsm, NULL);
+ struct obd_info oinfo = { { { 0 } } };
+ oinfo.oi_oa = &data->ioc_obdo1;
+ oinfo.oi_md = eco->eco_lsm;
+
+ rc = obd_setattr(ec->ec_exp, &oinfo, NULL);
echo_put_object(eco);
}
GOTO(out, rc);
- case OBD_IOC_OPEN:
- rc = echo_open (exp, &data->ioc_obdo1);
- GOTO(out, rc);
-
- case OBD_IOC_CLOSE:
- rc = echo_close (exp, &data->ioc_obdo1);
- GOTO(out, rc);
-
case OBD_IOC_BRW_WRITE:
if (!capable (CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
rw = OBD_BRW_WRITE;
/* fall through */
case OBD_IOC_BRW_READ:
- rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
- if (rc == 0) {
- if (data->ioc_pbuf2 == NULL) // NULL user data pointer
- rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
- eco->eco_lsm,
- data->ioc_offset,
- data->ioc_count);
- else
-#ifdef __KERNEL__
- rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
- eco->eco_lsm,
- data->ioc_offset,
- data->ioc_count,
- data->ioc_pbuf2);
-#endif
- echo_put_object(eco);
- }
+ rc = echo_client_brw_ioctl(rw, exp, data);
GOTO(out, rc);
case ECHO_IOC_GET_STRIPE:
} else {
rc = echo_create_object(obd, 0, &data->ioc_obdo1,
data->ioc_pbuf1,
- data->ioc_plen1);
+ data->ioc_plen1, &dummy_oti);
}
GOTO (out, rc);
if (!capable (CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
- rc = echo_enqueue (exp, &data->ioc_obdo1,
- data->ioc_conn1, /* lock mode */
+ rc = echo_client_enqueue(exp, &data->ioc_obdo1,
+ data->ioc_conn1, /* lock mode */
data->ioc_offset, data->ioc_count);/*extent*/
GOTO (out, rc);
case ECHO_IOC_CANCEL:
- rc = echo_cancel (exp, &data->ioc_obdo1);
+ rc = echo_client_cancel(exp, &data->ioc_obdo1);
GOTO (out, rc);
default:
EXIT;
out:
- class_export_put(exp);
+
+ /* XXX this should be in a helper also called by target_send_reply */
+ for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
+ i++, ack_lock++) {
+ if (!ack_lock->mode)
+ break;
+ ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
+ }
+
+ lock_kernel();
+
return rc;
}
-static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
{
- struct obd_ioctl_data* data = buf;
struct echo_client_obd *ec = &obddev->u.echo_client;
struct obd_device *tgt;
- struct lov_stripe_md *lsm = NULL;
+ struct lustre_handle conn = {0, };
struct obd_uuid echo_uuid = { "ECHO_UUID" };
+ struct obd_connect_data *ocd = NULL;
int rc;
ENTRY;
- if (data->ioc_inllen1 < 1) {
+ if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
CERROR("requires a TARGET OBD name\n");
RETURN(-EINVAL);
}
- tgt = class_name2obd(data->ioc_inlbuf1);
+ tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
- CERROR("device not attached or not set up (%d/%s)\n",
- data->ioc_dev, data->ioc_inlbuf1);
+ CERROR("device not attached or not set up (%s)\n",
+ lustre_cfg_string(lcfg, 1));
RETURN(-EINVAL);
}
spin_lock_init (&ec->ec_lock);
- INIT_LIST_HEAD (&ec->ec_objects);
+ CFS_INIT_LIST_HEAD (&ec->ec_objects);
ec->ec_unique = 0;
- rc = obd_connect(&ec->ec_conn, tgt, &echo_uuid);
- if (rc) {
- CERROR("fail to connect to device %d\n", data->ioc_dev);
- return (rc);
+ OBD_ALLOC(ocd, sizeof(*ocd));
+ if (ocd == NULL) {
+ CERROR("Can't alloc ocd connecting to %s\n",
+ lustre_cfg_string(lcfg, 1));
+ return -ENOMEM;
}
- ec->ec_lsmsize = obd_alloc_memmd (&ec->ec_conn, &lsm);
- if (ec->ec_lsmsize < 0) {
- CERROR ("Can't get # stripes: %d\n", rc);
- obd_disconnect (&ec->ec_conn, 0);
- rc = ec->ec_lsmsize;
- } else {
- ec->ec_nstripes = lsm->lsm_stripe_count;
- obd_free_memmd (&ec->ec_conn, &lsm);
+ ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL;
+ ocd->ocd_version = LUSTRE_VERSION_CODE;
+ ocd->ocd_group = FILTER_GROUP_ECHO;
+
+ rc = obd_connect(NULL, &conn, tgt, &echo_uuid, ocd, NULL);
+
+ OBD_FREE(ocd, sizeof(*ocd));
+
+ if (rc != 0) {
+ CERROR("fail to connect to device %s\n",
+ lustre_cfg_string(lcfg, 1));
+ return (rc);
}
+ ec->ec_exp = class_conn2export(&conn);
RETURN(rc);
}
-static int echo_cleanup(struct obd_device *obddev, int flags)
+static int echo_client_cleanup(struct obd_device *obddev)
{
struct list_head *el;
struct ec_object *eco;
echo_put_object(eco);
}
- rc = obd_disconnect(&ec->ec_conn, 0);
+ rc = obd_disconnect(ec->ec_exp);
if (rc != 0)
CERROR("fail to disconnect device: %d\n", rc);
RETURN(rc);
}
-static int echo_connect(struct lustre_handle *conn, struct obd_device *src,
- struct obd_uuid *cluuid)
+static int echo_client_connect(const struct lu_env *env,
+ struct lustre_handle *conn,
+ struct obd_device *src, struct obd_uuid *cluuid,
+ struct obd_connect_data *data, void *localdata)
{
struct obd_export *exp;
int rc;
+ ENTRY;
rc = class_connect(conn, src, cluuid);
if (rc == 0) {
- exp = class_conn2export (conn);
- INIT_LIST_HEAD(&exp->exp_ec_data.eced_open_head);
- INIT_LIST_HEAD(&exp->exp_ec_data.eced_locks);
+ exp = class_conn2export(conn);
+ CFS_INIT_LIST_HEAD(&exp->exp_ec_data.eced_locks);
class_export_put(exp);
}
RETURN (rc);
}
-static int echo_disconnect(struct lustre_handle *conn, int flags)
+static int echo_client_disconnect(struct obd_export *exp)
{
- struct obd_export *exp = class_conn2export (conn);
struct obd_device *obd;
struct echo_client_obd *ec;
- struct ec_open_object *ecoo;
struct ec_lock *ecl;
int rc;
+ ENTRY;
if (exp == NULL)
GOTO(out, rc = -EINVAL);
struct ec_lock, ecl_exp_chain);
list_del (&ecl->ecl_exp_chain);
- rc = obd_cancel (&ec->ec_conn, ecl->ecl_object->eco_lsm,
+ rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
ecl->ecl_mode, &ecl->ecl_lock_handle);
- CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect (%d)\n",
- ecl->ecl_object->eco_id, rc);
+ CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
+ "(%d)\n", ecl->ecl_object->eco_id, rc);
echo_put_object (ecl->ecl_object);
OBD_FREE (ecl, sizeof (*ecl));
}
- /* no more contention on export's open handle list */
- while (!list_empty (&exp->exp_ec_data.eced_open_head)) {
- ecoo = list_entry (exp->exp_ec_data.eced_open_head.next,
- struct ec_open_object, ecoo_exp_chain);
- list_del (&ecoo->ecoo_exp_chain);
-
- memcpy (&ecoo->ecoo_oa.o_inline, &ecoo->ecoo_och,
- FD_OSTDATA_SIZE);
- ecoo->ecoo_oa.o_valid |= OBD_MD_FLHANDLE;
-
- rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
- ecoo->ecoo_object->eco_lsm, NULL);
-
- CDEBUG (D_INFO, "Closed object "LPX64" on disconnect (%d)\n",
- ecoo->ecoo_oa.o_id, rc);
-
- echo_put_object (ecoo->ecoo_object);
- OBD_FREE (ecoo, sizeof (*ecoo));
- }
-
- rc = class_disconnect (conn, 0);
+ rc = class_disconnect(exp);
GOTO(out, rc);
out:
- class_export_put(exp);
return rc;
}
static struct obd_ops echo_obd_ops = {
- o_owner: THIS_MODULE,
- o_setup: echo_setup,
- o_cleanup: echo_cleanup,
- o_iocontrol: echo_iocontrol,
- o_connect: echo_connect,
- o_disconnect: echo_disconnect
+ .o_owner = THIS_MODULE,
+ .o_setup = echo_client_setup,
+ .o_cleanup = echo_client_cleanup,
+ .o_iocontrol = echo_client_iocontrol,
+ .o_connect = echo_client_connect,
+ .o_disconnect = echo_client_disconnect
};
int echo_client_init(void)
{
- struct lprocfs_static_vars lvars;
+ struct lprocfs_static_vars lvars = { 0 };
- lprocfs_init_vars(echo, &lvars);
- return class_register_type(&echo_obd_ops, lvars.module_vars,
- OBD_ECHO_CLIENT_DEVICENAME);
+ lprocfs_echo_init_vars(&lvars);
+ return class_register_type(&echo_obd_ops, NULL, lvars.module_vars,
+ LUSTRE_ECHO_CLIENT_NAME, NULL);
}
-void echo_client_cleanup(void)
+void echo_client_exit(void)
{
- class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME);
+ class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
}