+static void
+echo_free_object (struct ec_object *eco)
+{
+ struct obd_device *obd = eco->eco_device;
+ struct echo_client_obd *ec = &obd->u.echo_client;
+
+ LASSERT (eco->eco_refcount == 0);
+ if (!eco->eco_lsm)
+ CERROR("No object %s\n", obd->obd_name);
+ else
+ obd_free_memmd(ec->ec_exp, &eco->eco_lsm);
+ OBD_FREE (eco, sizeof (*eco));
+}
+
+static int echo_create_object(struct obd_device *obd, int on_target,
+ struct obdo *oa, void *ulsm, int ulsm_nob,
+ struct obd_trans_info *oti)
+{
+ struct echo_client_obd *ec = &obd->u.echo_client;
+ struct ec_object *eco2;
+ struct ec_object *eco;
+ struct lov_stripe_md *lsm;
+ int rc;
+ int i, idx;
+
+ if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
+ (on_target || /* set_stripe */
+ ec->ec_nstripes != 0)) { /* LOV */
+ CERROR ("No valid oid\n");
+ return (-EINVAL);
+ }
+
+ if (ulsm != NULL) {
+ eco = echo_allocate_object (obd);
+ if (eco == NULL)
+ return (-ENOMEM);
+
+ lsm = eco->eco_lsm;
+
+ rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob);
+ if (rc != 0)
+ goto failed;
+
+ /* setup object ID here for !on_target and LOV hint */
+ if ((oa->o_valid & OBD_MD_FLID) != 0)
+ eco->eco_id = lsm->lsm_object_id = oa->o_id;
+
+ if (lsm->lsm_stripe_count == 0)
+ lsm->lsm_stripe_count = ec->ec_nstripes;
+
+ if (lsm->lsm_stripe_size == 0)
+ lsm->lsm_stripe_size = CFS_PAGE_SIZE;
+
+ idx = ll_rand();
+
+ /* setup stripes: indices + default ids if required */
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (lsm->lsm_oinfo[i]->loi_id == 0)
+ lsm->lsm_oinfo[i]->loi_id = lsm->lsm_object_id;
+
+ lsm->lsm_oinfo[i]->loi_ost_idx =
+ (idx + i) % ec->ec_nstripes;
+ }
+ } else {
+ OBD_ALLOC(eco, sizeof(*eco));
+ if (!eco)
+ return (-ENOMEM);
+ eco->eco_device = obd;
+ lsm = NULL;
+ }
+
+ if (oa->o_id == 0)
+ oa->o_id = ++last_object_id;
+
+ if (on_target) {
+ oa->o_gr = FILTER_GROUP_ECHO;
+ oa->o_valid |= OBD_MD_FLGROUP;
+
+ rc = obd_create(ec->ec_exp, oa, &lsm, oti);
+ if (rc != 0)
+ goto failed;
+
+ /* See what object ID we were given */
+ eco->eco_id = oa->o_id = lsm->lsm_object_id;
+ oa->o_valid |= OBD_MD_FLID;
+
+ LASSERT(eco->eco_lsm == NULL || eco->eco_lsm == lsm);
+ eco->eco_lsm = lsm;
+ }
+
+ spin_lock (&ec->ec_lock);
+
+ eco2 = echo_find_object_locked (obd, oa->o_id);
+ if (eco2 != NULL) { /* conflict */
+ spin_unlock (&ec->ec_lock);
+
+ CERROR ("Can't create object id "LPX64": id already exists%s\n",
+ oa->o_id, on_target ? " (undoing create)" : "");
+
+ if (on_target)
+ obd_destroy(ec->ec_exp, oa, lsm, oti, NULL);
+
+ rc = -EEXIST;
+ goto failed;
+ }
+
+ list_add (&eco->eco_obj_chain, &ec->ec_objects);
+ spin_unlock (&ec->ec_lock);
+ CDEBUG (D_INFO,
+ "created %p: "LPX64"=%u#%u@%u refs %d del %d\n",
+ eco, eco->eco_id,
+ eco->eco_lsm->lsm_stripe_size,
+ eco->eco_lsm->lsm_stripe_count,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
+ eco->eco_refcount, eco->eco_deleted);
+ return (0);
+
+ failed:
+ echo_free_object (eco);
+ if (rc)
+ CERROR("%s: err %d on create\n", obd->obd_name, rc);
+ return (rc);
+}
+
+static int
+echo_get_object (struct ec_object **ecop, struct obd_device *obd,
+ struct obdo *oa)
+{
+ struct echo_client_obd *ec = &obd->u.echo_client;
+ struct ec_object *eco;
+ struct ec_object *eco2;
+ int rc;
+
+ if ((oa->o_valid & OBD_MD_FLID) == 0 ||
+ oa->o_id == 0) /* disallow use of object id 0 */
+ {
+ CERROR ("No valid oid\n");
+ return (-EINVAL);
+ }
+
+ spin_lock (&ec->ec_lock);
+ eco = echo_find_object_locked (obd, oa->o_id);
+ if (eco != NULL) {
+ if (eco->eco_deleted) { /* being deleted */
+ spin_unlock(&ec->ec_lock);/* (see comment in cleanup) */
+ return (-EAGAIN);
+ }
+
+ eco->eco_refcount++;
+ spin_unlock (&ec->ec_lock);
+ *ecop = eco;
+ CDEBUG (D_INFO,
+ "found %p: "LPX64"=%u#%u@%u refs %d del %d\n",
+ eco, eco->eco_id,
+ eco->eco_lsm->lsm_stripe_size,
+ eco->eco_lsm->lsm_stripe_count,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
+ eco->eco_refcount, eco->eco_deleted);
+ return (0);
+ }
+ spin_unlock (&ec->ec_lock);
+
+ if (ec->ec_nstripes != 0) /* striping required */
+ return (-ENOENT);
+
+ eco = echo_allocate_object (obd);
+ if (eco == NULL)
+ return (-ENOMEM);
+
+ eco->eco_id = eco->eco_lsm->lsm_object_id = oa->o_id;
+
+ spin_lock (&ec->ec_lock);
+
+ eco2 = echo_find_object_locked (obd, oa->o_id);
+ if (eco2 == NULL) { /* didn't race */
+ list_add (&eco->eco_obj_chain, &ec->ec_objects);
+ spin_unlock (&ec->ec_lock);
+ eco->eco_refcount = 1;
+ *ecop = eco;
+ CDEBUG (D_INFO,
+ "created %p: "LPX64"=%u#%u@%d refs %d del %d\n",
+ eco, eco->eco_id,
+ eco->eco_lsm->lsm_stripe_size,
+ eco->eco_lsm->lsm_stripe_count,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
+ eco->eco_refcount, eco->eco_deleted);
+ return (0);
+ }
+
+ if (eco2->eco_deleted)
+ rc = -EAGAIN; /* lose race */
+ else {
+ eco2->eco_refcount++; /* take existing */
+ *ecop = eco2;
+ rc = 0;
+ LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id);
+ CDEBUG (D_INFO,
+ "found(2) %p: "LPX64"=%u#%u@%d refs %d del %d\n",
+ eco2, eco2->eco_id,
+ eco2->eco_lsm->lsm_stripe_size,
+ eco2->eco_lsm->lsm_stripe_count,
+ eco2->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
+ eco2->eco_refcount, eco2->eco_deleted);
+ }
+
+ spin_unlock (&ec->ec_lock);
+
+ echo_free_object (eco);
+ return (rc);
+}
+
+static void
+echo_put_object (struct ec_object *eco)
+{
+ struct obd_device *obd = eco->eco_device;
+ struct echo_client_obd *ec = &obd->u.echo_client;
+
+ /* Release caller's ref on the object.
+ * delete => mark for deletion when last ref goes
+ */
+
+ spin_lock (&ec->ec_lock);
+
+ eco->eco_refcount--;
+ LASSERT (eco->eco_refcount >= 0);
+
+ CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u@%d refs %d del %d\n",
+ eco, eco->eco_id,
+ eco->eco_lsm->lsm_stripe_size,
+ eco->eco_lsm->lsm_stripe_count,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
+ eco->eco_refcount, eco->eco_deleted);
+
+ if (eco->eco_refcount != 0 || !eco->eco_deleted) {
+ spin_unlock (&ec->ec_lock);
+ return;
+ }
+
+ spin_unlock (&ec->ec_lock);
+
+ /* NB leave obj in the object list. We must prevent anyone from
+ * attempting to enqueue on this object number until we can be
+ * sure there will be no more lock callbacks.
+ */
+ obd_cancel_unused(ec->ec_exp, eco->eco_lsm, 0, NULL);
+
+ /* now we can let it go */
+ spin_lock (&ec->ec_lock);
+ list_del (&eco->eco_obj_chain);
+ spin_unlock (&ec->ec_lock);
+
+ LASSERT (eco->eco_refcount == 0);
+
+ echo_free_object (eco);
+}
+
+static void
+echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
+{
+ unsigned long stripe_count;
+ unsigned long stripe_size;
+ unsigned long width;
+ unsigned long woffset;
+ int stripe_index;
+ obd_off offset;
+
+ if (lsm->lsm_stripe_count <= 1)
+ return;
+
+ offset = *offp;
+ stripe_size = lsm->lsm_stripe_size;
+ stripe_count = lsm->lsm_stripe_count;
+
+ /* width = # bytes in all stripes */
+ width = stripe_size * stripe_count;
+
+ /* woffset = offset within a width; offset = whole number of widths */
+ woffset = do_div (offset, width);
+
+ stripe_index = woffset / stripe_size;
+
+ *idp = lsm->lsm_oinfo[stripe_index]->loi_id;
+ *offp = offset * stripe_size + woffset % stripe_size;
+}
+
+static void
+echo_client_page_debug_setup(struct lov_stripe_md *lsm,
+ cfs_page_t *page, int rw, obd_id id,
+ obd_off offset, obd_off count)
+{
+ char *addr;
+ obd_off stripe_off;
+ obd_id stripe_id;
+ int delta;
+
+ /* no partial pages on the client */
+ LASSERT(count == CFS_PAGE_SIZE);
+
+ addr = cfs_kmap(page);
+
+ for (delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+ if (rw == OBD_BRW_WRITE) {
+ stripe_off = offset + delta;
+ stripe_id = id;
+ echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
+ } else {
+ stripe_off = 0xdeadbeef00c0ffeeULL;
+ stripe_id = 0xdeadbeef00c0ffeeULL;