static void seq_server_proc_fini(struct lu_server_seq *seq);
/* Assigns client to sequence controller node. */
-int seq_server_set_cli(struct lu_server_seq *seq,
- struct lu_client_seq *cli,
- const struct lu_env *env)
+int seq_server_set_cli(const struct lu_env *env, struct lu_server_seq *seq,
+ struct lu_client_seq *cli)
{
int rc = 0;
ENTRY;
* flaged as sync write op.
*/
static int range_alloc_set(const struct lu_env *env,
- struct lu_seq_range *out,
- struct lu_server_seq *seq)
+ struct lu_seq_range *out,
+ struct lu_server_seq *seq)
{
struct lu_seq_range *space = &seq->lss_space;
struct lu_seq_range *loset = &seq->lss_lowater_set;
/* Saving new range to allocation space. */
*space = seq->lss_cli->lcs_space;
LASSERT(range_is_sane(space));
+ if (seq->lss_cli->lcs_srv == NULL) {
+ struct lu_server_fld *fld;
+
+ /* Insert it to the local FLDB */
+ fld = seq->lss_site->ss_server_fld;
+ mutex_lock(&fld->lsf_lock);
+ rc = fld_insert_entry(env, fld, space);
+ mutex_unlock(&fld->lsf_lock);
+ }
}
rc = range_alloc_set(env, out, seq);
#endif /* LPROCFS */
}
-int seq_server_init(struct lu_server_seq *seq,
+int seq_server_init(const struct lu_env *env,
+ struct lu_server_seq *seq,
struct dt_device *dev,
const char *prefix,
enum lu_mgr_type type,
- struct seq_server_site *ss,
- const struct lu_env *env)
+ struct seq_server_site *ss)
{
int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
ENTRY;
int rc;
ENTRY;
+ LASSERT(exp != NULL && !IS_ERR(exp));
req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
LUSTRE_MDS_VERSION, SEQ_QUERY);
if (req == NULL)
ptlrpc_at_set_req_timeout(req);
- if (seq->lcs_type == LUSTRE_SEQ_METADATA)
+ if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA)
mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+
rc = ptlrpc_queue_wait(req);
- if (seq->lcs_type == LUSTRE_SEQ_METADATA)
+
+ if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA)
mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
if (rc)
GOTO(out_req, rc);
if (exp != NULL)
seq->lcs_exp = class_export_get(exp);
- else if (type == LUSTRE_SEQ_METADATA)
- LASSERT(seq->lcs_srv != NULL);
snprintf(seq->lcs_name, sizeof(seq->lcs_name),
"cli-%s", prefix);
EXPORT_SYMBOL(fld_server_create);
/**
- * Lookup mds by seq, returns a range for given seq.
- *
- * If that entry is not cached in fld cache, request is sent to super
- * sequence controller node (MDT0). All other MDT[1...N] and client
- * cache fld entries, but this cache is not persistent.
- */
-int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
- seqno_t seq, struct lu_seq_range *range)
+ * Extract index information from fld name like srv-fsname-MDT0000
+ **/
+int fld_name_to_index(const char *name, __u32 *index)
+{
+ char *dash;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_INFO, "get index from %s\n", name);
+ dash = strrchr(name, '-');
+ if (dash == NULL)
+ RETURN(-EINVAL);
+ dash++;
+ rc = target_name2index(dash, index, NULL);
+ RETURN(rc);
+}
+
+/**
+ * Retrieve fldb entry from MDT0 and add to local FLDB and cache.
+ **/
+int fld_update_from_controller(const struct lu_env *env,
+ struct lu_server_fld *fld)
+{
+ struct fld_thread_info *info;
+ struct lu_seq_range *range;
+ struct lu_seq_range_array *lsra;
+ __u32 index;
+ struct ptlrpc_request *req;
+ int rc;
+ int i;
+ ENTRY;
+
+ /* Update only happens during initalization, i.e. local FLDB
+ * does not exist yet */
+ if (!fld->lsf_new)
+ RETURN(0);
+
+ rc = fld_name_to_index(fld->lsf_name, &index);
+ if (rc < 0)
+ RETURN(rc);
+
+ /* No need update fldb for MDT0 */
+ if (index == 0)
+ RETURN(0);
+
+ info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+ LASSERT(info != NULL);
+ range = &info->fti_lrange;
+ memset(range, 0, sizeof(*range));
+ range->lsr_index = index;
+ fld_range_set_mdt(range);
+
+ do {
+ rc = fld_client_rpc(fld->lsf_control_exp, range, FLD_READ,
+ &req);
+ if (rc != 0 && rc != -EAGAIN)
+ GOTO(out, rc);
+
+ LASSERT(req != NULL);
+ lsra = (struct lu_seq_range_array *)req_capsule_server_get(
+ &req->rq_pill, &RMF_GENERIC_DATA);
+ if (lsra == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ range_array_le_to_cpu(lsra, lsra);
+ for (i = 0; i < lsra->lsra_count; i++) {
+ int rc1;
+
+ if (lsra->lsra_lsr[i].lsr_flags != LU_SEQ_RANGE_MDT)
+ GOTO(out, rc = -EINVAL);
+
+ if (lsra->lsra_lsr[i].lsr_index != index)
+ GOTO(out, rc = -EINVAL);
+
+ rc1 = fld_insert_entry(env, fld, &lsra->lsra_lsr[i]);
+ if (rc1 != 0)
+ GOTO(out, rc = rc1);
+ }
+ if (rc == -EAGAIN)
+ *range = lsra->lsra_lsr[lsra->lsra_count - 1];
+ } while (rc == -EAGAIN);
+
+ fld->lsf_new = 1;
+out:
+ if (req != NULL)
+ ptlrpc_req_finished(req);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(fld_update_from_controller);
+
+/**
+ * Lookup sequece in local cache/fldb.
+ **/
+int fld_local_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+ seqno_t seq, struct lu_seq_range *range)
{
struct lu_seq_range *erange;
struct fld_thread_info *info;
*range = *erange;
RETURN(0);
}
+ RETURN(rc);
+}
+EXPORT_SYMBOL(fld_local_lookup);
- if (fld->lsf_obj) {
+/**
+ * Lookup MDT/OST by seq, returns a range for given seq.
+ *
+ * If that entry is not cached in fld cache, request is sent to super
+ * sequence controller node (MDT0). All other MDT[1...N] and client
+ * cache fld entries, but this cache is not persistent.
+ */
+int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+ seqno_t seq, struct lu_seq_range *range)
+{
+ __u32 index;
+ int rc;
+ ENTRY;
+
+ rc = fld_local_lookup(env, fld, seq, range);
+ if (likely(rc == 0))
+ RETURN(rc);
+
+ rc = fld_name_to_index(fld->lsf_name, &index);
+ if (rc < 0)
+ RETURN(rc);
+ else
+ rc = 0;
+
+ if (index == 0) {
/* On server side, all entries should be in cache.
* If we can not find it in cache, just return error */
CERROR("%s: Cannot find sequence "LPX64": rc = %d\n",
*/
range->lsr_start = seq;
rc = fld_client_rpc(fld->lsf_control_exp,
- range, FLD_LOOKUP);
+ range, FLD_QUERY, NULL);
if (rc == 0)
fld_cache_insert(fld->lsf_cache, range);
}
* All MDT server handle fld lookup operation. But only MDT0 has fld index.
* if entry is not found in cache we need to forward lookup request to MDT0
*/
-static int fld_server_handle(struct lu_server_fld *fld,
- const struct lu_env *env,
- __u32 opc, struct lu_seq_range *range)
+static int fld_handle_lookup(struct tgt_session_info *tsi)
{
- int rc;
+ struct obd_export *exp = tsi->tsi_exp;
+ struct lu_site *site = exp->exp_obd->obd_lu_dev->ld_site;
+ struct lu_server_fld *fld;
+ struct lu_seq_range *in;
+ struct lu_seq_range *out;
+ int rc;
ENTRY;
- switch (opc) {
- case FLD_LOOKUP:
- rc = fld_server_lookup(env, fld, range->lsr_start, range);
- break;
- default:
- rc = -EINVAL;
- break;
- }
+ in = req_capsule_client_get(tsi->tsi_pill, &RMF_FLD_MDFLD);
+ if (in == NULL)
+ RETURN(err_serious(-EPROTO));
+
+ rc = req_capsule_server_pack(tsi->tsi_pill);
+ if (unlikely(rc != 0))
+ RETURN(err_serious(rc));
+
+ out = req_capsule_server_get(tsi->tsi_pill, &RMF_FLD_MDFLD);
+ if (out == NULL)
+ RETURN(err_serious(-EPROTO));
+ *out = *in;
+
+ fld = lu_site2seq(site)->ss_server_fld;
- CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
- DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
+ rc = fld_server_lookup(tsi->tsi_env, fld, in->lsr_start, out);
+
+ CDEBUG(D_INFO, "%s: FLD req handle: error %d (range: "DRANGE")\n",
+ fld->lsf_name, rc, PRANGE(out));
RETURN(rc);
}
-static int fld_handler(struct tgt_session_info *tsi)
+static int fld_handle_read(struct tgt_session_info *tsi)
{
struct obd_export *exp = tsi->tsi_exp;
struct lu_site *site = exp->exp_obd->obd_lu_dev->ld_site;
struct lu_seq_range *in;
- struct lu_seq_range *out;
- int rc;
- __u32 *opc;
+ void *data;
+ int rc;
ENTRY;
- opc = req_capsule_client_get(tsi->tsi_pill, &RMF_FLD_OPC);
- if (opc != NULL) {
- in = req_capsule_client_get(tsi->tsi_pill, &RMF_FLD_MDFLD);
- if (in == NULL)
- RETURN(err_serious(-EPROTO));
- out = req_capsule_server_get(tsi->tsi_pill, &RMF_FLD_MDFLD);
- if (out == NULL)
- RETURN(err_serious(-EPROTO));
- *out = *in;
-
- /* For old 2.0 client, the 'lsr_flags' is uninitialized.
- * Set it as 'LU_SEQ_RANGE_MDT' by default. */
- if (!(exp_connect_flags(exp) & OBD_CONNECT_64BITHASH) &&
- !(exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) &&
- !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) &&
- !exp->exp_libclient)
- fld_range_set_mdt(out);
-
- rc = fld_server_handle(lu_site2seq(site)->ss_server_fld,
- tsi->tsi_env, *opc, out);
- } else {
- rc = err_serious(-EPROTO);
- }
+ req_capsule_set(tsi->tsi_pill, &RQF_FLD_READ);
+
+ in = req_capsule_client_get(tsi->tsi_pill, &RMF_FLD_MDFLD);
+ if (in == NULL)
+ RETURN(err_serious(-EPROTO));
+
+ req_capsule_set_size(tsi->tsi_pill, &RMF_GENERIC_DATA, RCL_SERVER,
+ PAGE_CACHE_SIZE);
+
+ rc = req_capsule_server_pack(tsi->tsi_pill);
+ if (unlikely(rc != 0))
+ RETURN(err_serious(rc));
+
+ data = req_capsule_server_get(tsi->tsi_pill, &RMF_GENERIC_DATA);
+
+ rc = fld_server_read(tsi->tsi_env, lu_site2seq(site)->ss_server_fld,
+ in, data, PAGE_CACHE_SIZE);
+ RETURN(rc);
+}
+
+static int fld_handle_query(struct tgt_session_info *tsi)
+{
+ int rc;
+
+ ENTRY;
+
+ req_capsule_set(tsi->tsi_pill, &RQF_FLD_QUERY);
+
+ rc = fld_handle_lookup(tsi);
RETURN(rc);
}
#endif
int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
- struct dt_device *dt, const char *prefix, int mds_node_id,
- int type)
+ struct dt_device *dt, const char *prefix, int type)
{
int cache_size, cache_threshold;
int rc;
ENTRY;
- snprintf(fld->lsf_name, sizeof(fld->lsf_name),
- "srv-%s", prefix);
+ snprintf(fld->lsf_name, sizeof(fld->lsf_name), "srv-%s", prefix);
cache_size = FLD_SERVER_CACHE_SIZE / sizeof(struct fld_cache_entry);
RETURN(rc);
}
- if (!mds_node_id && type == LU_SEQ_RANGE_MDT) {
- rc = fld_index_init(env, fld, dt);
- if (rc)
- GOTO(out_cache, rc);
- } else {
- fld->lsf_obj = NULL;
- }
+ rc = fld_index_init(env, fld, dt);
+ if (rc)
+ GOTO(out_cache, rc);
rc = fld_server_proc_init(fld);
if (rc)
EXPORT_SYMBOL(fld_server_fini);
struct tgt_handler fld_handlers[] = {
-TGT_FLD_HDL(HABEO_REFERO, FLD_QUERY, fld_handler),
+TGT_FLD_HDL_VAR(0, FLD_QUERY, fld_handle_query),
+TGT_FLD_HDL_VAR(0, FLD_READ, fld_handle_read),
};
EXPORT_SYMBOL(fld_handlers);
struct dt_it *it;
const struct dt_it_ops *iops;
int rc;
+ __u32 index;
ENTRY;
info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
dof.dof_type = DFT_INDEX;
dof.u.dof_idx.di_feat = &fld_index_features;
- dt_obj = dt_find_or_create(env, dt, &fid, &dof, attr);
+ dt_obj = dt_locate(env, dt, &fid);
if (IS_ERR(dt_obj)) {
rc = PTR_ERR(dt_obj);
- CERROR("%s: Can't find \"%s\" obj %d\n", fld->lsf_name,
- fld_index_name, rc);
dt_obj = NULL;
GOTO(out, rc);
}
+ LASSERT(dt_obj != NULL);
+ if (!dt_object_exists(dt_obj)) {
+ lu_object_put(env, &dt_obj->do_lu);
+ dt_obj = dt_find_or_create(env, dt, &fid, &dof, attr);
+ fld->lsf_new = 1;
+ if (IS_ERR(dt_obj)) {
+ rc = PTR_ERR(dt_obj);
+ CERROR("%s: Can't find \"%s\" obj %d\n", fld->lsf_name,
+ fld_index_name, rc);
+ dt_obj = NULL;
+ GOTO(out, rc);
+ }
+ }
+
fld->lsf_obj = dt_obj;
rc = dt_obj->do_ops->do_index_try(env, dt_obj, &fld_index_features);
if (rc != 0) {
GOTO(out_it_put, rc);
rc = iops->next(env, it);
} while (rc == 0);
+ } else {
+ fld->lsf_new = 1;
}
- /* Note: fld_insert_entry will detect whether these
- * special entries already exist inside FLDB */
- mutex_lock(&fld->lsf_lock);
- rc = fld_insert_special_entries(env, fld);
- mutex_unlock(&fld->lsf_lock);
- if (rc != 0) {
- CERROR("%s: insert special entries failed!: rc = %d\n",
- fld->lsf_name, rc);
+ rc = fld_name_to_index(fld->lsf_name, &index);
+ if (rc < 0)
GOTO(out_it_put, rc);
- }
+ else
+ rc = 0;
+ if (index == 0) {
+ /* Note: fld_insert_entry will detect whether these
+ * special entries already exist inside FLDB */
+ mutex_lock(&fld->lsf_lock);
+ rc = fld_insert_special_entries(env, fld);
+ mutex_unlock(&fld->lsf_lock);
+ if (rc != 0) {
+ CERROR("%s: insert special entries failed!: rc = %d\n",
+ fld->lsf_name, rc);
+ GOTO(out_it_put, rc);
+ }
+ }
out_it_put:
iops->put(env, it);
out_it_fini:
if (attr != NULL)
OBD_FREE_PTR(attr);
- if (rc != 0) {
+ if (rc < 0) {
if (dt_obj != NULL)
lu_object_put(env, &dt_obj->do_lu);
fld->lsf_obj = NULL;
}
EXIT;
}
+
+int fld_server_read(const struct lu_env *env, struct lu_server_fld *fld,
+ struct lu_seq_range *range, void *data, int data_len)
+{
+ struct lu_seq_range_array *lsra = data;
+ struct fld_thread_info *info;
+ struct dt_object *dt_obj = fld->lsf_obj;
+ struct lu_seq_range *entry;
+ struct dt_it *it;
+ const struct dt_it_ops *iops;
+ int rc;
+
+ ENTRY;
+
+ lsra->lsra_count = 0;
+ iops = &dt_obj->do_index_ops->dio_it;
+ it = iops->init(env, dt_obj, 0, NULL);
+ if (IS_ERR(it))
+ RETURN(PTR_ERR(it));
+
+ rc = iops->load(env, it, range->lsr_end);
+ if (rc <= 0)
+ GOTO(out_it_fini, rc);
+
+ info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+ LASSERT(info != NULL);
+ entry = &info->fti_rec;
+ do {
+ rc = iops->rec(env, it, (struct dt_rec *)entry, 0);
+ if (rc != 0)
+ GOTO(out_it_put, rc);
+
+ if (offsetof(typeof(*lsra), lsra_lsr[lsra->lsra_count + 1]) >
+ data_len)
+ GOTO(out, rc = -EAGAIN);
+
+ range_be_to_cpu(entry, entry);
+ if (entry->lsr_index == range->lsr_index &&
+ entry->lsr_flags == range->lsr_flags &&
+ entry->lsr_start > range->lsr_start) {
+ lsra->lsra_lsr[lsra->lsra_count] = *entry;
+ lsra->lsra_count++;
+ }
+
+ rc = iops->next(env, it);
+ } while (rc == 0);
+ if (rc > 0)
+ rc = 0;
+out:
+ range_array_cpu_to_le(lsra, lsra);
+out_it_put:
+ iops->put(env, it);
+out_it_fini:
+ iops->fini(env, it);
+
+ RETURN(rc);
+}
unsigned int fci_no_shrink:1;
};
-enum fld_op {
- FLD_CREATE = 0,
- FLD_DELETE = 1,
- FLD_LOOKUP = 2
-};
-
enum {
/* 4M of FLD cache will not hurt client a lot. */
FLD_SERVER_CACHE_SIZE = (4 * 0x100000),
int fld_index_lookup(const struct lu_env *env, struct lu_server_fld *fld,
seqno_t seq, struct lu_seq_range *range);
+int fld_name_to_index(const char *name, __u32 *index);
int fld_server_mod_init(void);
void fld_server_mod_exit(void);
+int fld_server_read(const struct lu_env *env, struct lu_server_fld *fld,
+ struct lu_seq_range *range, void *data, int data_len);
# ifdef LPROCFS
extern const struct file_operations fld_proc_seq_fops;
extern struct lprocfs_vars fld_server_proc_list[];
# endif /* HAVE_SERVER_SUPPORT */
int fld_client_rpc(struct obd_export *exp,
- struct lu_seq_range *range, __u32 fld_op);
-
+ struct lu_seq_range *range, __u32 fld_op,
+ struct ptlrpc_request **reqp);
#endif /* __KERNEL__ */
struct fld_cache *fld_cache_init(const char *name,
EXPORT_SYMBOL(fld_client_fini);
int fld_client_rpc(struct obd_export *exp,
- struct lu_seq_range *range, __u32 fld_op)
+ struct lu_seq_range *range, __u32 fld_op,
+ struct ptlrpc_request **reqp)
{
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
struct lu_seq_range *prange;
__u32 *op;
- int rc;
+ int rc = 0;
struct obd_import *imp;
ENTRY;
LASSERT(exp != NULL);
imp = class_exp2cliimp(exp);
- req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_QUERY, LUSTRE_MDS_VERSION,
- FLD_QUERY);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
- *op = fld_op;
+ switch (fld_op) {
+ case FLD_QUERY:
+ req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_QUERY,
+ LUSTRE_MDS_VERSION, FLD_QUERY);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ /* XXX: only needed when talking to old server(< 2.6), it should
+ * be removed when < 2.6 server is not supported */
+ op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
+ *op = FLD_LOOKUP;
+
+ if (imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS)
+ req->rq_allow_replay = 1;
+ break;
+ case FLD_READ:
+ req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_READ,
+ LUSTRE_MDS_VERSION, FLD_READ);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA,
+ RCL_SERVER, PAGE_CACHE_SIZE);
+ break;
+ default:
+ rc = -EINVAL;
+ break;
+ }
- prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
- *prange = *range;
+ if (rc != 0)
+ RETURN(rc);
- ptlrpc_request_set_replen(req);
+ prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
+ *prange = *range;
+ ptlrpc_request_set_replen(req);
req->rq_request_portal = FLD_REQUEST_PORTAL;
req->rq_reply_portal = MDC_REPLY_PORTAL;
ptlrpc_at_set_req_timeout(req);
- if (fld_op == FLD_LOOKUP &&
- imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS)
- req->rq_allow_replay = 1;
-
- if (fld_op != FLD_LOOKUP)
- mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
- fld_enter_request(&exp->exp_obd->u.cli);
- rc = ptlrpc_queue_wait(req);
- fld_exit_request(&exp->exp_obd->u.cli);
- if (fld_op != FLD_LOOKUP)
- mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
- if (rc)
- GOTO(out_req, rc);
+ fld_enter_request(&exp->exp_obd->u.cli);
+ rc = ptlrpc_queue_wait(req);
+ fld_exit_request(&exp->exp_obd->u.cli);
+ if (rc)
+ GOTO(out_req, rc);
+
+ if (fld_op == FLD_QUERY) {
+ prange = req_capsule_server_get(&req->rq_pill,
+ &RMF_FLD_MDFLD);
+ if (prange == NULL)
+ GOTO(out_req, rc = -EFAULT);
+ *range = *prange;
+ }
- prange = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
- if (prange == NULL)
- GOTO(out_req, rc = -EFAULT);
- *range = *prange;
- EXIT;
+ EXIT;
out_req:
- ptlrpc_req_finished(req);
- return rc;
+ if (rc != 0 || reqp == NULL) {
+ ptlrpc_req_finished(req);
+ req = NULL;
+ }
+
+ if (reqp != NULL)
+ *reqp = req;
+
+ return rc;
}
int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds,
} else
#endif
{
- rc = fld_client_rpc(target->ft_exp, &res, FLD_LOOKUP);
+ rc = fld_client_rpc(target->ft_exp, &res, FLD_QUERY, NULL);
}
if (rc == 0) {
LUSTRE_MDS_VERSION)
/* FID Location Database handlers */
-#define TGT_FLD_HDL(flags, name, fn) \
- TGT_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name, \
+#define TGT_FLD_HDL_VAR(flags, name, fn) \
+ TGT_RPC_HANDLER(FLD_QUERY, flags, name, fn, NULL, \
LUSTRE_MDS_VERSION)
/* Request with a format known in advance */
__u32 lsr_flags;
};
+struct lu_seq_range_array {
+ __u32 lsra_count;
+ __u32 lsra_padding;
+ struct lu_seq_range lsra_lsr[0];
+};
+
#define LU_SEQ_RANGE_MDT 0x0
#define LU_SEQ_RANGE_OST 0x1
#define LU_SEQ_RANGE_ANY 0x3
extern void lustre_swab_mdt_rec_reint(struct mdt_rec_reint *rr);
+/* lmv structures */
struct lmv_desc {
__u32 ld_tgt_count; /* how many MDS's */
__u32 ld_active_tgt_count; /* how many active */
extern void lustre_swab_lmv_stripe_md(struct lmv_stripe_md *mea);
-/* lmv structures */
#define MEA_MAGIC_LAST_CHAR 0xb2221ca1
#define MEA_MAGIC_ALL_CHARS 0xb222a11c
#define MEA_MAGIC_HASH_SEGMENT 0xb222a11b
#define MAX_HASH_HIGHEST_BIT 0x1000000000000000ULL
enum fld_rpc_opc {
- FLD_QUERY = 900,
- FLD_LAST_OPC,
- FLD_FIRST_OPC = FLD_QUERY
+ FLD_QUERY = 900,
+ FLD_READ = 901,
+ FLD_LAST_OPC,
+ FLD_FIRST_OPC = FLD_QUERY
};
enum seq_rpc_opc {
SEQ_ALLOC_META = 1
};
+enum fld_op {
+ FLD_CREATE = 0,
+ FLD_DELETE = 1,
+ FLD_LOOKUP = 2,
+};
+
/*
* LOV data structures
*/
int server_name2svname(const char *label, char *svname, const char **endptr,
size_t svsize);
int server_name_is_ost(const char *svname);
+int target_name2index(const char *svname, __u32 *idx, const char **endptr);
int lustre_put_lsi(struct super_block *sb);
int lustre_start_simple(char *obdname, char *type, char *uuid,
/* Server methods */
-int seq_server_init(struct lu_server_seq *seq,
+int seq_server_init(const struct lu_env *env,
+ struct lu_server_seq *seq,
struct dt_device *dev,
const char *prefix,
enum lu_mgr_type type,
- struct seq_server_site *ss,
- const struct lu_env *env);
+ struct seq_server_site *ss);
void seq_server_fini(struct lu_server_seq *seq,
const struct lu_env *env);
struct lu_seq_range *out,
const struct lu_env *env);
-int seq_server_set_cli(struct lu_server_seq *seq,
- struct lu_client_seq *cli,
- const struct lu_env *env);
+int seq_server_set_cli(const struct lu_env *env,
+ struct lu_server_seq *seq,
+ struct lu_client_seq *cli);
/* Client methods */
int seq_client_init(struct lu_client_seq *seq,
dst->lsr_flags = be32_to_cpu(src->lsr_flags);
}
+static inline void range_array_cpu_to_le(struct lu_seq_range_array *dst,
+ const struct lu_seq_range_array *src)
+{
+ int i;
+
+ for (i = 0; i < src->lsra_count; i++)
+ range_cpu_to_le(&dst->lsra_lsr[i], &src->lsra_lsr[i]);
+
+ dst->lsra_count = cpu_to_le32(src->lsra_count);
+}
+
+static inline void range_array_le_to_cpu(struct lu_seq_range_array *dst,
+ const struct lu_seq_range_array *src)
+{
+ int i;
+
+ dst->lsra_count = le32_to_cpu(src->lsra_count);
+ for (i = 0; i < dst->lsra_count; i++)
+ range_le_to_cpu(&dst->lsra_lsr[i], &src->lsra_lsr[i]);
+}
+
/** @} fid */
#endif /* __LUSTRE_FID_H */
* Fld service name in form "fld-srv-lustre-MDTXXX" */
char lsf_name[80];
+ /**
+ * Just reformated or upgraded, and this flag is being
+ * used to check whether the local FLDB is needs to be
+ * synced with global FLDB(in MDT0), and it is only needed
+ * if the MDT is upgraded from < 2.6 to 2.6, i.e. when the
+ * local FLDB is being invited */
+ unsigned int lsf_new:1;
};
struct lu_client_fld {
/* Server methods */
int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
- struct dt_device *dt, const char *prefix, int mds_node_id,
- int type);
+ struct dt_device *dt, const char *prefix, int type);
void fld_server_fini(const struct lu_env *env, struct lu_server_fld *fld);
int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
seqno_t seq, struct lu_seq_range *range);
+int fld_local_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+ seqno_t seq, struct lu_seq_range *range);
+
+int fld_update_from_controller(const struct lu_env *env,
+ struct lu_server_fld *fld);
+
/* Client methods */
int fld_client_init(struct lu_client_fld *fld,
const char *prefix, int hash);
/* fid/fld req_format */
extern struct req_format RQF_SEQ_QUERY;
extern struct req_format RQF_FLD_QUERY;
+extern struct req_format RQF_FLD_READ;
/* MDS req_format */
extern struct req_format RQF_MDS_CONNECT;
extern struct req_format RQF_MDS_DISCONNECT;
#define OBD_FAIL_FLD 0x1100
#define OBD_FAIL_FLD_QUERY_NET 0x1101
+#define OBD_FAIL_FLD_READ_NET 0x1102
#define OBD_FAIL_SEC_CTX 0x1200
#define OBD_FAIL_SEC_CTX_INIT_NET 0x1201
return 0;
}
-/*
- * Init client sequence manager which is used by local MDS to talk to sequence
- * controller on remote node.
- */
-static int lod_seq_init_cli(const struct lu_env *env,
- struct lod_device *lod,
- char *tgtuuid, int index)
-{
- struct seq_server_site *ss;
- struct obd_device *osp;
- int rc;
- char *prefix;
- struct obd_uuid obd_uuid;
- ENTRY;
-
- ss = lu_site2seq(lod2lu_dev(lod)->ld_site);
- LASSERT(ss != NULL);
-
- /* check if this is adding the first MDC and controller is not yet
- * initialized. */
- if (index != 0 || ss->ss_client_seq)
- RETURN(0);
-
- obd_str2uuid(&obd_uuid, tgtuuid);
- osp = class_find_client_obd(&obd_uuid, LUSTRE_OSP_NAME,
- &lod->lod_dt_dev.dd_lu_dev.ld_obd->obd_uuid);
- if (osp == NULL) {
- CERROR("%s: can't find %s device\n",
- lod->lod_dt_dev.dd_lu_dev.ld_obd->obd_name,
- tgtuuid);
- RETURN(-EINVAL);
- }
-
- if (!osp->obd_set_up) {
- CERROR("target %s not set up\n", osp->obd_name);
- rc = -EINVAL;
- }
-
- LASSERT(ss->ss_control_exp);
- OBD_ALLOC_PTR(ss->ss_client_seq);
- if (ss->ss_client_seq == NULL)
- RETURN(-ENOMEM);
-
- OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
- if (!prefix) {
- OBD_FREE_PTR(ss->ss_client_seq);
- ss->ss_client_seq = NULL;
- RETURN(-ENOMEM);
- }
-
- snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", osp->obd_name);
- rc = seq_client_init(ss->ss_client_seq, ss->ss_control_exp,
- LUSTRE_SEQ_METADATA, prefix, NULL);
- OBD_FREE(prefix, MAX_OBD_NAME + 5);
- if (rc) {
- OBD_FREE_PTR(ss->ss_client_seq);
- ss->ss_client_seq = NULL;
- RETURN(rc);
- }
-
- LASSERT(ss->ss_server_seq != NULL);
- rc = seq_server_set_cli(ss->ss_server_seq, ss->ss_client_seq,
- env);
-
- RETURN(rc);
-}
-
-static void lod_seq_fini_cli(struct lod_device *lod)
-{
- struct seq_server_site *ss;
-
- ENTRY;
-
- ss = lu_site2seq(lod2lu_dev(lod)->ld_site);
- if (ss == NULL) {
- EXIT;
- return;
- }
-
- if (ss->ss_server_seq)
- seq_server_set_cli(ss->ss_server_seq,
- NULL, NULL);
-
- if (ss->ss_control_exp) {
- class_export_put(ss->ss_control_exp);
- ss->ss_control_exp = NULL;
- }
-
- EXIT;
- return;
-}
-
/**
* Procss config log on LOD
* \param env environment info
mdt_index = index;
rc = lod_add_device(env, lod, arg1, index, gen,
mdt_index, LUSTRE_MDC_NAME, 1);
- if (rc == 0)
- rc = lod_seq_init_cli(env, lod, arg1,
- mdt_index);
} else if (lcfg->lcfg_command == LCFG_LOV_ADD_INA) {
/*FIXME: Add mdt_index for LCFG_LOV_ADD_INA*/
mdt_index = 0;
lu_dev_del_linkage(dev->ld_site, dev);
lod_cleanup_desc_tgts(env, lod, &lod->lod_mdt_descs, lcfg);
lod_cleanup_desc_tgts(env, lod, &lod->lod_ost_descs, lcfg);
-
- lod_seq_fini_cli(lod);
-
if (lcfg->lcfg_command == LCFG_PRE_CLEANUP)
break;
/*
RETURN(rc);
}
-static int mdt_seq_fini(const struct lu_env *env,
- struct mdt_device *m)
+static void mdt_deregister_seq_exp(struct mdt_device *mdt)
{
- return seq_site_fini(env, mdt_seq_site(m));
+ struct seq_server_site *ss = mdt_seq_site(mdt);
+
+ if (ss->ss_node_id == 0)
+ return;
+
+ if (ss->ss_client_seq != NULL) {
+ lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
+ ss->ss_client_seq->lcs_exp = NULL;
+ }
+
+ if (ss->ss_server_fld != NULL) {
+ lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp);
+ ss->ss_server_fld->lsf_control_exp = NULL;
+ }
}
-static int mdt_seq_init(const struct lu_env *env,
- const char *uuid,
- struct mdt_device *m)
+static void mdt_seq_fini_cli(struct mdt_device *mdt)
{
- struct seq_server_site *ss;
- char *prefix;
- int rc;
+ struct seq_server_site *ss = mdt_seq_site(mdt);
+
+ if (ss == NULL)
+ return;
+
+ if (ss->ss_server_seq == NULL)
+ seq_server_set_cli(NULL, ss->ss_server_seq, NULL);
+
+ return;
+}
+
+static int mdt_seq_fini(const struct lu_env *env, struct mdt_device *mdt)
+{
+ mdt_seq_fini_cli(mdt);
+ mdt_deregister_seq_exp(mdt);
+
+ return seq_site_fini(env, mdt_seq_site(mdt));
+}
+
+/**
+ * It will retrieve its FLDB entries from MDT0, and it only happens
+ * when upgrading existent FS to 2.6 or when local FLDB is corrupted,
+ * and it needs to refresh FLDB from the MDT0.
+ **/
+static int mdt_register_lwp_callback(void *data)
+{
+ struct lu_env env;
+ struct mdt_device *mdt = data;
+ struct lu_server_fld *fld = mdt_seq_site(mdt)->ss_server_fld;
+ int rc;
ENTRY;
- ss = mdt_seq_site(m);
+ LASSERT(mdt_seq_site(mdt)->ss_node_id != 0);
- /*
- * This is sequence-controller node. Init seq-controller server on local
- * MDT.
- */
- if (ss->ss_node_id == 0) {
- LASSERT(ss->ss_control_seq == NULL);
+ if (!likely(fld->lsf_new))
+ RETURN(0);
- OBD_ALLOC_PTR(ss->ss_control_seq);
- if (ss->ss_control_seq == NULL)
- RETURN(-ENOMEM);
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: cannot init env: rc = %d\n", mdt_obd_name(mdt), rc);
+ RETURN(rc);
+ }
- rc = seq_server_init(ss->ss_control_seq,
- m->mdt_bottom, uuid,
- LUSTRE_SEQ_CONTROLLER,
- ss,
- env);
+ rc = fld_update_from_controller(&env, fld);
+ if (rc != 0) {
+ CERROR("%s: cannot update controller: rc = %d\n",
+ mdt_obd_name(mdt), rc);
+ GOTO(out, rc);
+ }
+out:
+ lu_env_fini(&env);
+ RETURN(rc);
+}
- if (rc)
- GOTO(out_seq_fini, rc);
+static int mdt_register_seq_exp(struct mdt_device *mdt)
+{
+ struct seq_server_site *ss = mdt_seq_site(mdt);
+ char *lwp_name = NULL;
+ int rc;
- OBD_ALLOC_PTR(ss->ss_client_seq);
- if (ss->ss_client_seq == NULL)
- GOTO(out_seq_fini, rc = -ENOMEM);
+ if (ss->ss_node_id == 0)
+ return 0;
- OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
- if (prefix == NULL) {
- OBD_FREE_PTR(ss->ss_client_seq);
- GOTO(out_seq_fini, rc = -ENOMEM);
- }
+ OBD_ALLOC(lwp_name, MAX_OBD_NAME);
+ if (lwp_name == NULL)
+ GOTO(out_free, rc = -ENOMEM);
- snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
- uuid);
+ rc = tgt_name2lwpname(mdt_obd_name(mdt), lwp_name);
+ if (rc != 0)
+ GOTO(out_free, rc);
- /*
- * Init seq-controller client after seq-controller server is
- * ready. Pass ss->ss_control_seq to it for direct talking.
- */
- rc = seq_client_init(ss->ss_client_seq, NULL,
- LUSTRE_SEQ_METADATA, prefix,
- ss->ss_control_seq);
- OBD_FREE(prefix, MAX_OBD_NAME + 5);
+ rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp,
+ NULL, NULL);
+ if (rc != 0)
+ GOTO(out_free, rc);
+
+ rc = lustre_register_lwp_item(lwp_name,
+ &ss->ss_server_fld->lsf_control_exp,
+ mdt_register_lwp_callback, mdt);
+ if (rc != 0) {
+ lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
+ ss->ss_client_seq->lcs_exp = NULL;
+ GOTO(out_free, rc);
+ }
+out_free:
+ if (lwp_name != NULL)
+ OBD_FREE(lwp_name, MAX_OBD_NAME);
+
+ return rc;
+}
+
+/*
+ * Init client sequence manager which is used by local MDS to talk to sequence
+ * controller on remote node.
+ */
+static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
+{
+ struct seq_server_site *ss = mdt_seq_site(mdt);
+ int rc;
+ char *prefix;
+ ENTRY;
+
+ /* check if this is adding the first MDC and controller is not yet
+ * initialized. */
+ OBD_ALLOC_PTR(ss->ss_client_seq);
+ if (ss->ss_client_seq == NULL)
+ RETURN(-ENOMEM);
+
+ OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+ if (prefix == NULL) {
+ OBD_FREE_PTR(ss->ss_client_seq);
+ ss->ss_client_seq = NULL;
+ RETURN(-ENOMEM);
+ }
+ /* Note: seq_client_fini will be called in seq_site_fini */
+ snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt));
+ rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
+ prefix, ss->ss_node_id == 0 ? ss->ss_control_seq :
+ NULL);
+ OBD_FREE(prefix, MAX_OBD_NAME + 5);
+ if (rc != 0) {
+ OBD_FREE_PTR(ss->ss_client_seq);
+ ss->ss_client_seq = NULL;
+ RETURN(rc);
+ }
+
+ rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
+
+ RETURN(rc);
+}
+
+static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt)
+{
+ struct seq_server_site *ss;
+ int rc;
+ ENTRY;
+
+ ss = mdt_seq_site(mdt);
+ /* init sequence controller server(MDT0) */
+ if (ss->ss_node_id == 0) {
+ OBD_ALLOC_PTR(ss->ss_control_seq);
+ if (ss->ss_control_seq == NULL)
+ RETURN(-ENOMEM);
+
+ rc = seq_server_init(env, ss->ss_control_seq, mdt->mdt_bottom,
+ mdt_obd_name(mdt), LUSTRE_SEQ_CONTROLLER,
+ ss);
if (rc)
GOTO(out_seq_fini, rc);
}
- /* Init seq-server on local MDT */
- LASSERT(ss->ss_server_seq == NULL);
-
+ /* Init normal sequence server */
OBD_ALLOC_PTR(ss->ss_server_seq);
if (ss->ss_server_seq == NULL)
GOTO(out_seq_fini, rc = -ENOMEM);
- rc = seq_server_init(ss->ss_server_seq,
- m->mdt_bottom, uuid,
- LUSTRE_SEQ_SERVER,
- ss,
- env);
+ rc = seq_server_init(env, ss->ss_server_seq, mdt->mdt_bottom,
+ mdt_obd_name(mdt), LUSTRE_SEQ_SERVER, ss);
if (rc)
- GOTO(out_seq_fini, rc = -ENOMEM);
+ GOTO(out_seq_fini, rc);
- /* Assign seq-controller client to local seq-server. */
- if (ss->ss_node_id == 0) {
- LASSERT(ss->ss_client_seq != NULL);
+ /* init seq client for seq server to talk to seq controller(MDT0) */
+ rc = mdt_seq_init_cli(env, mdt);
+ if (rc != 0)
+ GOTO(out_seq_fini, rc);
- rc = seq_server_set_cli(ss->ss_server_seq,
- ss->ss_client_seq,
- env);
- }
+ if (ss->ss_node_id != 0)
+ /* register controler export through lwp */
+ rc = mdt_register_seq_exp(mdt);
EXIT;
out_seq_fini:
if (rc)
- mdt_seq_fini(env, m);
+ mdt_seq_fini(env, mdt);
return rc;
}
RETURN(rc = -ENOMEM);
rc = fld_server_init(env, ss->ss_server_fld, m->mdt_bottom, uuid,
- ss->ss_node_id, LU_SEQ_RANGE_MDT);
+ LU_SEQ_RANGE_MDT);
if (rc) {
OBD_FREE_PTR(ss->ss_server_fld);
ss->ss_server_fld = NULL;
if (rc)
GOTO(err_fini_stack, rc);
- rc = mdt_seq_init(env, mdt_obd_name(m), m);
+ rc = mdt_seq_init(env, m);
if (rc)
GOTO(err_fini_fld, rc);
}
EXPORT_SYMBOL(server_name_is_ost);
-/* Get the index from the obd name.
- rc = server type, or
- rc < 0 on error
- if endptr isn't NULL it is set to end of name */
-int server_name2index(const char *svname, __u32 *idx, const char **endptr)
+/**
+ * Get the index from the target name MDTXXXX/OSTXXXX
+ * rc = server type, or rc < 0 on error
+ **/
+int target_name2index(const char *tgtname, __u32 *idx, const char **endptr)
{
+ const char *dash = tgtname;
unsigned long index;
int rc;
- const char *dash;
-
- /* We use server_name2fsname() just for parsing */
- rc = server_name2fsname(svname, NULL, &dash);
- if (rc != 0)
- return rc;
-
- dash++;
if (strncmp(dash, "MDT", 3) == 0)
rc = LDD_F_SV_TYPE_MDT;
index = simple_strtoul(dash, (char **)endptr, 16);
if (idx != NULL)
*idx = index;
+ return rc;
+}
+EXPORT_SYMBOL(target_name2index);
+
+/* Get the index from the obd name.
+ rc = server type, or
+ rc < 0 on error
+ if endptr isn't NULL it is set to end of name */
+int server_name2index(const char *svname, __u32 *idx, const char **endptr)
+{
+ const char *dash;
+ int rc;
+
+ /* We use server_name2fsname() just for parsing */
+ rc = server_name2fsname(svname, NULL, &dash);
+ if (rc != 0)
+ return rc;
+
+ dash++;
+ rc = target_name2index(dash, idx, endptr);
+ if (rc < 0)
+ return rc;
/* Account for -mdc after index that is possible when specifying mdt */
if (endptr != NULL && strncmp(LUSTRE_MDC_NAME, *endptr + 1,
GOTO(out_free, rc = -ENOMEM);
}
- rc = seq_server_init(ss->ss_server_seq, ofd->ofd_osd, obd_name,
- LUSTRE_SEQ_SERVER, ss, env);
+ rc = seq_server_init(env, ss->ss_server_seq, ofd->ofd_osd, obd_name,
+ LUSTRE_SEQ_SERVER, ss);
if (rc) {
CERROR("%s : seq server init error %d\n", obd_name, rc);
GOTO(out_free, rc);
OBD_FREE(name, strlen(obd_name) + 10);
name = NULL;
- rc = seq_server_set_cli(ss->ss_server_seq, ss->ss_client_seq, env);
+ rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
out_free:
if (rc) {
RETURN(rc = -ENOMEM);
rc = fld_server_init(env, ss->ss_server_fld, ofd->ofd_osd, uuid,
- ss->ss_node_id, LU_SEQ_RANGE_OST);
+ LU_SEQ_RANGE_OST);
if (rc) {
OBD_FREE_PTR(ss->ss_server_fld);
ss->ss_server_fld = NULL;
RETURN(0);
}
+/**
+ * It will retrieve its FLDB entries from MDT0, and it only happens
+ * when upgrading existent FS to 2.6.
+ **/
+static int ofd_register_lwp_callback(void *data)
+{
+ struct lu_env env;
+ struct ofd_device *ofd = data;
+ struct lu_server_fld *fld = ofd->ofd_seq_site.ss_server_fld;
+ int rc;
+ ENTRY;
+
+ if (!likely(fld->lsf_new))
+ RETURN(0);
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (rc) {
+ CERROR("%s: cannot init env: rc = %d\n", ofd_name(ofd), rc);
+ RETURN(rc);
+ }
+
+ rc = fld_update_from_controller(&env, fld);
+ if (rc != 0) {
+ CERROR("%s: cannot update controller: rc = %d\n",
+ ofd_name(ofd), rc);
+ GOTO(out, rc);
+ }
+out:
+ lu_env_fini(&env);
+ RETURN(rc);
+}
+
static int ofd_register_seq_exp(struct ofd_device *ofd)
{
struct seq_server_site *ss = &ofd->ofd_seq_site;
rc = lustre_register_lwp_item(lwp_name,
&ss->ss_server_fld->lsf_control_exp,
- NULL, NULL);
+ ofd_register_lwp_callback, ofd);
if (rc != 0) {
lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
ss->ss_client_seq->lcs_exp = NULL;
RETURN(rc);
}
+static int osd_seq_exists(const struct lu_env *env,
+ struct osd_device *osd, obd_seq seq)
+{
+ struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
+ struct seq_server_site *ss = osd_seq_site(osd);
+ int rc;
+ ENTRY;
+
+ if (ss == NULL)
+ RETURN(1);
+
+ rc = osd_fld_lookup(env, osd, seq, range);
+ if (rc != 0) {
+ if (rc != -ENOENT)
+ CERROR("%s: can't lookup FLD sequence "LPX64
+ ": rc = %d\n", osd_name(osd), seq, rc);
+ RETURN(0);
+ }
+
+ RETURN(ss->ss_node_id == range->lsr_index);
+}
+
/*
* Concurrency: shouldn't matter.
*/
obd_seq seq, struct lu_seq_range *range)
{
struct seq_server_site *ss = osd_seq_site(osd);
- int rc;
if (fid_seq_is_idif(seq)) {
fld_range_set_ost(range);
LASSERT(ss != NULL);
fld_range_set_any(range);
- rc = fld_server_lookup(env, ss->ss_server_fld, seq, range);
- if (rc != 0) {
- CERROR("%s: cannot find FLD range for "LPX64": rc = %d\n",
- osd_name(osd), seq, rc);
- }
- return rc;
+ /* OSD will only do local fld lookup */
+ return fld_local_lookup(env, ss->ss_server_fld, seq, range);
}
/*
struct dt_object_format *dof,
struct thandle *handle)
{
- struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct osd_thandle *oh;
int rc;
ENTRY;
if (rc != 0)
RETURN(rc);
- /* It does fld look up inside declare, and the result will be
- * added to fld cache, so the following fld lookup inside insert
- * does not need send RPC anymore, so avoid send rpc with holding
- * transaction */
- if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
- !fid_is_last_id(lu_object_fid(&dt->do_lu)))
- osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
- fid_seq(lu_object_fid(&dt->do_lu)), range);
-
-
RETURN(rc);
}
return rc;
}
-static int osd_mdt_seq_exists(const struct lu_env *env,
- struct osd_device *osd, obd_seq seq)
-{
- struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
- struct seq_server_site *ss = osd_seq_site(osd);
- int rc;
- ENTRY;
-
- if (ss == NULL)
- RETURN(1);
-
- /* XXX: currently, each MDT only store avaible sequence on disk, and no
- * allocated sequences information on disk, so we have to lookup FLDB,
- * but it probably makes more sense also store allocated sequence
- * locally, so we do not need do remote FLDB lookup in OSD */
- rc = osd_fld_lookup(env, osd, seq, range);
- if (rc != 0) {
- CERROR("%s: Can not lookup fld for "LPX64"\n",
- osd_name(osd), seq);
- RETURN(0);
- }
-
- RETURN(ss->ss_node_id == range->lsr_index);
-}
-
static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
struct lu_fid *fid)
{
if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
RETURN(0);
- /* Currently only check this for FID on MDT */
- if (osd_mdt_seq_exists(env, osd, fid_seq(fid)))
+ if (osd_seq_exists(env, osd, fid_seq(fid)))
RETURN(0);
RETURN(1);
int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
const struct lu_fid *fid, enum oi_check_flags flags)
{
+ struct lu_seq_range *range = &info->oti_seq_range;
+ int rc;
ENTRY;
if (flags & OI_KNOWN_ON_OST)
if (!(flags & OI_CHECK_FLD))
RETURN(0);
- if (osd->od_is_ost)
+ rc = osd_fld_lookup(info->oti_env, osd, fid_seq(fid), range);
+ if (rc != 0) {
+ CERROR("%s: "DFID" lookup failed: rc = %d\n", osd_name(osd),
+ PFID(fid), rc);
+ RETURN(rc);
+ }
+
+ if (fld_range_is_ost(range))
RETURN(1);
RETURN(0);
lu_object_put(env, &obj->oo_dt.do_lu);
}
-static int osd_mdt_seq_exists(const struct lu_env *env, struct osd_device *osd,
- obd_seq seq)
+static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
+ obd_seq seq)
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
if (ss == NULL)
RETURN(1);
- /* XXX: currently, each MDT only store avaible sequence on disk,
- * and no allocated sequences information on disk, so it has to
- * lookup FLDB. It probably makes more sense also store allocated
- * sequence locally, so we do not need do remote FLDB lookup in OSD */
rc = osd_fld_lookup(env, osd, seq, range);
if (rc != 0) {
CERROR("%s: Can not lookup fld for "LPX64"\n",
if (!fid_is_norm(fid) && !fid_is_root(fid))
RETURN(0);
- /* Currently, it only used to check FID on MDT */
- if (osd_mdt_seq_exists(env, osd, fid_seq(fid)))
+ if (osd_seq_exists(env, osd, fid_seq(fid)))
RETURN(0);
RETURN(1);
obd_seq seq, struct lu_seq_range *range)
{
struct seq_server_site *ss = osd_seq_site(osd);
- int rc;
if (fid_seq_is_idif(seq)) {
fld_range_set_ost(range);
LASSERT(ss != NULL);
fld_range_set_any(range);
- rc = fld_server_lookup(env, ss->ss_server_fld, seq, range);
- if (rc != 0)
- CERROR("%s: cannot find FLD range for "LPX64": rc = %d\n",
- osd_name(osd), seq, rc);
- return rc;
+ /* OSD will only do local fld lookup */
+ return fld_local_lookup(env, ss->ss_server_fld, seq, range);
}
int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd,
const struct lu_fid *fid)
{
+ struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
+ int rc;
ENTRY;
if (fid_is_idif(fid))
fid_is_name_llog(fid) || fid_is_quota(fid))
RETURN(0);
- if (osd->od_is_ost)
+ rc = osd_fld_lookup(env, osd, fid_seq(fid), range);
+ if (rc != 0) {
+ CERROR("%s: "DFID" lookup failed: rc = %d\n", osd_name(osd),
+ PFID(fid), rc);
+ RETURN(rc);
+ }
+
+ if (fld_range_is_ost(range))
RETURN(1);
+
RETURN(0);
}
}
ptlrpc_pinger_add_import(imp);
-
- if (osp->opd_connect_mdt && data->ocd_index == 0) {
- /* set seq controller export for MDC0 if exists */
- struct seq_server_site *ss;
-
- ss = lu_site2seq(osp2lu_dev(osp)->ld_site);
- ss->ss_control_exp = class_export_get(*exp);
- ss->ss_server_fld->lsf_control_exp = *exp;
- }
out:
RETURN(rc);
}
struct dt_object, do_lu);
}
+static inline struct seq_server_site *osp_seq_site(struct osp_device *osp)
+{
+ return osp->opd_dt_dev.dd_lu_dev.ld_site->ld_seq_site;
+}
+
#define osp_init_rpc_lock(lck) mdc_init_rpc_lock(lck)
#define osp_get_rpc_lock(lck, it) mdc_get_rpc_lock(lck, it)
#define osp_put_rpc_lock(lck, it) mdc_put_rpc_lock(lck, it)
&RMF_FLD_MDFLD
};
+static const struct req_msg_field *fld_read_client[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_FLD_MDFLD
+};
+
+static const struct req_msg_field *fld_read_server[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_GENERIC_DATA
+};
+
static const struct req_msg_field *mds_getattr_name_client[] = {
&RMF_PTLRPC_BODY,
&RMF_MDT_BODY,
&RQF_MGS_CONFIG_READ,
&RQF_SEQ_QUERY,
&RQF_FLD_QUERY,
+ &RQF_FLD_READ,
&RQF_MDS_CONNECT,
&RQF_MDS_DISCONNECT,
&RQF_MDS_GET_INFO,
DEFINE_REQ_FMT0("FLD_QUERY", fld_query_client, fld_query_server);
EXPORT_SYMBOL(RQF_FLD_QUERY);
+struct req_format RQF_FLD_READ =
+ DEFINE_REQ_FMT0("FLD_READ", fld_read_client, fld_read_server);
+EXPORT_SYMBOL(RQF_FLD_READ);
+
struct req_format RQF_LOG_CANCEL =
DEFINE_REQ_FMT0("OBD_LOG_CANCEL", log_cancel_client, empty);
EXPORT_SYMBOL(RQF_LOG_CANCEL);
{ SEC_CTX_INIT_CONT,"sec_ctx_init_cont" },
{ SEC_CTX_FINI, "sec_ctx_fini" },
{ FLD_QUERY, "fld_query" },
+ { FLD_READ, "fld_read" },
{ UPDATE_OBJ, "update_obj" },
};
(long long)MDS_ATTR_BLOCKS);
LASSERTF(FLD_QUERY == 900, "found %lld\n",
(long long)FLD_QUERY);
+ LASSERTF(FLD_READ == 901, "found %lld\n",
+ (long long)FLD_READ);
LASSERTF(FLD_FIRST_OPC == 900, "found %lld\n",
(long long)FLD_FIRST_OPC);
- LASSERTF(FLD_LAST_OPC == 901, "found %lld\n",
+ LASSERTF(FLD_LAST_OPC == 902, "found %lld\n",
(long long)FLD_LAST_OPC);
LASSERTF(SEQ_QUERY == 700, "found %lld\n",
(long long)SEQ_QUERY);
CHECK_VALUE_64X(MDS_ATTR_BLOCKS);
CHECK_VALUE(FLD_QUERY);
+ CHECK_VALUE(FLD_READ);
CHECK_VALUE(FLD_FIRST_OPC);
CHECK_VALUE(FLD_LAST_OPC);
return ret;
}
+
void lustre_assert_wire_constants(void)
{
/* Wire protocol assertions generated by 'wirecheck'
* (make -C lustre/utils newwiretest)
- * running on Linux deva 2.6.32-358.18.1.el6_lustre.gdf685d2.x86_64 #1 SMP Sat Aug 31 20:41:4
- * with gcc version 4.4.4 20100726 (Red Hat 4.4.4-13) (GCC) */
+ * running on Linux testnode 2.6.32 #3 SMP Thu Sep 13 12:42:57 PDT 2012 x86_64 x86_64 x86_64
+ * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC) */
/* Constants... */
(long long)MDS_ATTR_BLOCKS);
LASSERTF(FLD_QUERY == 900, "found %lld\n",
(long long)FLD_QUERY);
+ LASSERTF(FLD_READ == 901, "found %lld\n",
+ (long long)FLD_READ);
LASSERTF(FLD_FIRST_OPC == 900, "found %lld\n",
(long long)FLD_FIRST_OPC);
- LASSERTF(FLD_LAST_OPC == 901, "found %lld\n",
+ LASSERTF(FLD_LAST_OPC == 902, "found %lld\n",
(long long)FLD_LAST_OPC);
LASSERTF(SEQ_QUERY == 700, "found %lld\n",
(long long)SEQ_QUERY);