#else
#include <liblustre.h>
#endif
+
#include <linux/ext2_fs.h>
#include <lustre/lustre_idl.h>
if (ev == OBD_NOTIFY_OCD) {
struct obd_connect_data *conn_data =
&watched->u.cli.cl_import->imp_connect_data;
-
+
/* Set connect data to desired target, update
* exp_connect_flags. */
rc = lmv_set_mdc_data(lmv, uuid, conn_data);
LASSERT(mds != NULL);
- /*
+ /*
* Allocate new fid on target according to operation type and parent
* home mds.
*/
obj = lmv_obj_grab(obd, &op_data->op_fid1);
if (obj != NULL || op_data->op_name == NULL ||
op_data->op_opc != LUSTRE_OPC_MKDIR) {
- /*
+ /*
* Allocate fid for non-dir or for null name or for case parent
* dir is split.
*/
if (obj) {
lmv_obj_put(obj);
- /*
+ /*
* If we have this flag turned on, and we see that
* parent dir is split, this means, that caller did not
* notice split yet. This is race and we would like to
if (op_data->op_bias & MDS_CHECK_SPLIT)
RETURN(-ERESTART);
}
-
+
/*
* Allocate new fid on same mds where parent fid is located. In
* case of split dir, ->op_fid1 here will contain fid of slave
if (rc)
GOTO(out, rc);
} else {
- /*
+ /*
* Parent directory is not split and we want to create a
* directory in it. Let's calculate where to place it according
* to name.
CDEBUG(D_OTHER, "created - "DFID"\n", PFID(&op_data->op_fid1));
} else if (rc == -ERESTART) {
LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
+ DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
"Got -ERESTART during create!\n");
ptlrpc_req_finished(*request);
*request = NULL;
-
+
/*
* Directory got split. Time to update local object and repeat
* the request with proper MDS.
memset(op_data2, 0, sizeof(*op_data2));
op_data2->op_fid1 = mea->mea_ids[i];
op_data2->op_bias = 0;
-
+
tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
if (IS_ERR(tgt_exp))
GOTO(cleanup, rc = PTR_ERR(tgt_exp));
obj = lmv_obj_grab(obd, &rid);
if (obj) {
int mea_idx;
-
+
/* Directory is split. Look for right mds for this name */
mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
filename, namelen - 1);
}
} else if (rc == -ERESTART) {
LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
+ DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
"Got -ERESTART during getattr!\n");
ptlrpc_req_finished(*request);
*request = NULL;
-
+
/*
* Directory got split. Time to update local object and repeat
* the request with proper MDS.
LASSERT(++loop <= 2);
if (op_data->op_namelen != 0) {
int mea_idx;
-
+
/* Usual link request */
obj = lmv_obj_grab(obd, &op_data->op_fid2);
if (obj) {
rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
+ DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
"Got -ERESTART during link!\n");
ptlrpc_req_finished(*request);
*request = NULL;
-
+
/*
* Directory got split. Time to update local object and repeat
* the request with proper MDS.
CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
lmv_obj_put(obj);
}
-
+
request:
op_data->op_fsuid = current->fsuid;
op_data->op_fsgid = current->fsgid;
new, newlen, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
+ DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
"Got -ERESTART during rename!\n");
ptlrpc_req_finished(*request);
*request = NULL;
-
+
/*
* Directory got split. Time to update local object and repeat
* the request with proper MDS.
RETURN(0);
}
+static void lmv_hash_adjust(__u32 *hash, __u32 hash_adj)
+{
+ __u32 val;
+
+ val = le32_to_cpu(*hash);
+ if (val != 0 && val != DIR_END_OFF)
+ *hash = cpu_to_le32(val - hash_adj);
+}
+
+static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
+{
+ __u64 id;
+ struct obd_import *imp;
+
+ /*
+ * XXX Hack: to get nid we assume that underlying obd device is mdc.
+ */
+ imp = class_exp2cliimp(exp);
+ id = imp->imp_connection->c_self + fid_flatten(fid);
+ return id ^ (id >> 32);
+}
+
static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
- struct obd_capa *oc, __u64 offset, struct page *page,
+ struct obd_capa *oc, __u64 offset64, struct page *page,
struct ptlrpc_request **request)
{
struct obd_device *obd = exp->exp_obd;
struct obd_export *tgt_exp;
struct lu_fid rid = *fid;
struct lmv_obj *obj;
- int i = 0, rc;
+ __u32 offset;
+ __u32 hash_adj = 0;
+ __u32 rank = 0;
+ __u32 seg_size = 0;
+ int tgt = 0;
+ int tgt0 = 0;
+ int rc;
+ int nr = 0;
ENTRY;
+ offset = offset64;
+ /*
+ * Check that offset is representable by 32bit number.
+ */
+ LASSERT((__u64)offset == offset64);
+
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- CDEBUG(D_INFO, "READPAGE at %llu from "DFID"\n", offset, PFID(&rid));
+ CDEBUG(D_INFO, "READPAGE at %x from "DFID"\n", offset, PFID(&rid));
obj = lmv_obj_grab(obd, fid);
if (obj) {
- __u64 index = offset;
- __u64 seg = MAX_HASH_SIZE;
+ struct lmv_inode *loi;
+
lmv_obj_lock(obj);
- LASSERT(obj->lo_objcount > 0);
- do_div(seg, obj->lo_objcount);
- do_div(index, (__u32)seg);
- i = (int)index;
- rid = obj->lo_inodes[i].li_fid;
- tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
+ nr = obj->lo_objcount;
+ LASSERT(nr > 0);
+ seg_size = MAX_HASH_SIZE / nr;
+ loi = obj->lo_inodes;
+ rank = lmv_node_rank(lmv_get_export(lmv, loi[0].li_mds),
+ fid) % nr;
+ tgt0 = (offset / seg_size) % nr;
+ tgt = (tgt0 + rank) % nr;
+
+ if (tgt < tgt0)
+ /*
+ * Wrap around.
+ *
+ * Last segment has unusual length due to division
+ * rounding.
+ */
+ hash_adj = MAX_HASH_SIZE - seg_size * nr;
+ else
+ hash_adj = 0;
+
+ hash_adj += rank * seg_size;
- lmv_obj_unlock(obj);
+ CDEBUG(D_INFO, "hash_adj: %x %x %x/%x -> %x/%x\n",
+ rank, hash_adj, offset, tgt0, offset + hash_adj, tgt);
+
+ offset = (offset + hash_adj) % MAX_HASH_SIZE;
+ rid = obj->lo_inodes[tgt].li_fid;
+ tgt_exp = lmv_get_export(lmv, loi[tgt].li_mds);
CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n",
- PFID(&rid), (unsigned long)offset, i);
- } else {
+ PFID(&rid), (unsigned long)offset, tgt);
+ } else
tgt_exp = lmv_find_export(lmv, &rid);
- }
if (IS_ERR(tgt_exp))
GOTO(cleanup, rc = PTR_ERR(tgt_exp));
rc = md_readpage(tgt_exp, &rid, oc, offset, page, request);
if (rc)
GOTO(cleanup, rc);
- if (obj && i < obj->lo_objcount - 1) {
+ if (obj) {
+ __u32 hend;
struct lu_dirpage *dp;
- __u32 end;
- (void)cfs_kmap(page);
- dp = cfs_page_address(page);
- end = le32_to_cpu(dp->ldp_hash_end);
- if (end == DIR_END_OFF) {
- __u64 max_hash = MAX_HASH_SIZE;
-
- do_div(max_hash, obj->lo_objcount);
- dp->ldp_hash_end = (__u32)max_hash * (i + 1);
- CDEBUG(D_INFO, ""DFID" reset end %lu i %d\n", PFID(&rid),
- (unsigned long)dp->ldp_hash_end, i);
+ struct lu_dirent *ent;
+
+ dp = cfs_kmap(page);
+
+ hend = le32_to_cpu(dp->ldp_hash_end);
+ lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
+ lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
+
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent))
+ lmv_hash_adjust(&ent->lde_hash, hash_adj);
+
+ if (tgt0 != nr - 1) {
+ __u32 end;
+
+ end = le32_to_cpu(dp->ldp_hash_end);
+ if (end == DIR_END_OFF) {
+ dp->ldp_hash_end = cpu_to_le32(seg_size *
+ (tgt0 + 1));
+ CDEBUG(D_INFO, ""DFID" reset end %x tgt %d\n",
+ PFID(&rid),
+ le32_to_cpu(dp->ldp_hash_end), tgt);
+ }
}
cfs_kunmap(page);
}
*/
EXIT;
cleanup:
- if (obj)
+ if (obj) {
+ lmv_obj_unlock(obj);
lmv_obj_put(obj);
+ }
return rc;
}
op_data2->op_fsuid = current->fsuid;
op_data2->op_fsgid = current->fsgid;
op_data2->op_bias = 0;
-
+
LASSERT(mea != NULL);
for (i = 0; i < mea->mea_count; i++) {
memset(op_data2, 0, sizeof(*op_data2));
op_data->op_fsuid = current->fsuid;
op_data->op_fsgid = current->fsgid;
op_data->op_cap = current->cap_effective;
-
+
rc = md_unlink(tgt_exp, op_data, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
+ DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
"Got -ERESTART during unlink!\n");
ptlrpc_req_finished(*request);
*request = NULL;
-
+
/*
* Directory got split. Time to update local object and repeat
* the request with proper MDS.