* in the LICENSE file that accompanied this code).
*
* You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see [sun.com URL with a
- * copy of GPLv2].
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
#include <lustre_mdc.h>
#include "fld_internal.h"
-/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c
+/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c
* It should be common thing. The same about mdc RPC lock */
static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
{
spin_lock(&cli->cl_loi_list_lock);
cli->cl_r_in_flight--;
list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-
+
if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
/* No free request slots anymore */
break;
RETURN(NULL);
}
-static int fld_dht_hash(struct lu_client_fld *fld,
- seqno_t seq)
-{
- /* XXX: here should be DHT hash */
- return fld_rrb_hash(fld, seq);
-}
-
-static struct lu_fld_target *
-fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
-{
- /* XXX: here should be DHT scan code */
- return fld_rrb_scan(fld, seq);
-}
-
-struct lu_fld_hash fld_hash[3] = {
- {
- .fh_name = "DHT",
- .fh_hash_func = fld_dht_hash,
- .fh_scan_func = fld_dht_scan
- },
+struct lu_fld_hash fld_hash[] = {
{
.fh_name = "RRB",
.fh_hash_func = fld_rrb_hash,
int fld_client_init(struct lu_client_fld *fld,
const char *prefix, int hash)
{
-#ifdef __KERNEL__
int cache_size, cache_threshold;
-#endif
int rc;
ENTRY;
fld->lcf_flags = LUSTRE_FLD_INIT;
CFS_INIT_LIST_HEAD(&fld->lcf_targets);
-#ifdef __KERNEL__
cache_size = FLD_CLIENT_CACHE_SIZE /
sizeof(struct fld_cache_entry);
FLD_CLIENT_CACHE_THRESHOLD / 100;
fld->lcf_cache = fld_cache_init(fld->lcf_name,
- FLD_CLIENT_HTABLE_SIZE,
cache_size, cache_threshold);
if (IS_ERR(fld->lcf_cache)) {
rc = PTR_ERR(fld->lcf_cache);
fld->lcf_cache = NULL;
GOTO(out, rc);
}
-#endif
rc = fld_client_proc_init(fld);
if (rc)
}
spin_unlock(&fld->lcf_lock);
-#ifdef __KERNEL__
if (fld->lcf_cache != NULL) {
if (!IS_ERR(fld->lcf_cache))
fld_cache_fini(fld->lcf_cache);
fld->lcf_cache = NULL;
}
-#endif
EXIT;
}
EXPORT_SYMBOL(fld_client_fini);
-static int fld_client_rpc(struct obd_export *exp,
- struct md_fld *mf, __u32 fld_op)
+int fld_client_rpc(struct obd_export *exp,
+ struct lu_seq_range *range, __u32 fld_op)
{
struct ptlrpc_request *req;
- struct md_fld *pmf;
+ struct lu_seq_range *prange;
__u32 *op;
int rc;
ENTRY;
op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
*op = fld_op;
- pmf = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
- *pmf = *mf;
+ prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
+ *prange = *range;
ptlrpc_request_set_replen(req);
req->rq_request_portal = FLD_REQUEST_PORTAL;
if (rc)
GOTO(out_req, rc);
- pmf = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
- if (pmf == NULL)
+ prange = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
+ if (prange == NULL)
GOTO(out_req, rc = -EFAULT);
- *mf = *pmf;
+ *range = *prange;
EXIT;
out_req:
ptlrpc_req_finished(req);
return rc;
}
-int fld_client_create(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t mds,
- const struct lu_env *env)
-{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
- struct lu_fld_target *target;
- int rc;
- ENTRY;
-
- fld->lcf_flags |= LUSTRE_FLD_RUN;
- target = fld_client_get_target(fld, seq);
- LASSERT(target != NULL);
-
- CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: "
- LPU64") on target %s (idx "LPU64")\n", fld->lcf_name,
- seq, mds, fld_target_name(target), target->ft_idx);
-
-#ifdef __KERNEL__
- if (target->ft_srv != NULL) {
- LASSERT(env != NULL);
- rc = fld_server_create(target->ft_srv, env, seq, mds);
- } else {
-#endif
- rc = fld_client_rpc(target->ft_exp, &md_fld, FLD_CREATE);
-#ifdef __KERNEL__
- }
-#endif
-
- if (rc == 0) {
- /*
- * Do not return result of calling fld_cache_insert()
- * here. First of all because it may return -EEXISTS. Another
- * reason is that, we do not want to stop proceeding because of
- * cache errors.
- */
- fld_cache_insert(fld->lcf_cache, seq, mds);
- } else {
- CERROR("%s: Can't create FLD entry, rc %d\n",
- fld->lcf_name, rc);
- }
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_create);
-
-int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
- const struct lu_env *env)
-{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
- struct lu_fld_target *target;
- int rc;
- ENTRY;
-
- fld->lcf_flags |= LUSTRE_FLD_RUN;
- fld_cache_delete(fld->lcf_cache, seq);
-
- target = fld_client_get_target(fld, seq);
- LASSERT(target != NULL);
-
- CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on "
- "target %s (idx "LPU64")\n", fld->lcf_name, seq,
- fld_target_name(target), target->ft_idx);
-
-#ifdef __KERNEL__
- if (target->ft_srv != NULL) {
- LASSERT(env != NULL);
- rc = fld_server_delete(target->ft_srv,
- env, seq);
- } else {
-#endif
- rc = fld_client_rpc(target->ft_exp,
- &md_fld, FLD_DELETE);
-#ifdef __KERNEL__
- }
-#endif
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_delete);
-
int fld_client_lookup(struct lu_client_fld *fld,
seqno_t seq, mdsno_t *mds,
const struct lu_env *env)
{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+ struct lu_seq_range res;
struct lu_fld_target *target;
int rc;
ENTRY;
fld->lcf_flags |= LUSTRE_FLD_RUN;
- rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
- if (rc == 0)
+ rc = fld_cache_lookup(fld->lcf_cache, seq, &res);
+ if (rc == 0) {
+ *mds = res.lsr_mdt;
RETURN(0);
+ }
/* Can not find it in the cache */
target = fld_client_get_target(fld, seq);
"target %s (idx "LPU64")\n", fld->lcf_name, seq,
fld_target_name(target), target->ft_idx);
+ res.lsr_start = seq;
#ifdef __KERNEL__
if (target->ft_srv != NULL) {
LASSERT(env != NULL);
rc = fld_server_lookup(target->ft_srv,
- env, seq, &md_fld.mf_mds);
+ env, seq, &res);
} else {
#endif
- /*
- * insert the 'inflight' sequence. No need to protect that,
- * we are trying to reduce numbers of RPC but not restrict
- * to them exactly one
- */
- fld_cache_insert_inflight(fld->lcf_cache, seq);
rc = fld_client_rpc(target->ft_exp,
- &md_fld, FLD_LOOKUP);
+ &res, FLD_LOOKUP);
#ifdef __KERNEL__
}
#endif
- if (seq < FID_SEQ_START) {
- /*
- * The current solution for IGIF is to bind it to mds0.
- * In the future, this should be fixed once IGIF can be found
- * in FLD.
- */
- md_fld.mf_mds = 0;
- rc = 0;
- }
if (rc == 0) {
- *mds = md_fld.mf_mds;
+ *mds = res.lsr_mdt;
- /*
- * Do not return error here as well. See previous comment in
- * same situation in function fld_client_create().
- */
- fld_cache_insert(fld->lcf_cache, seq, *mds);
- } else {
- /* remove 'inflight' seq if it exists */
- fld_cache_delete(fld->lcf_cache, seq);
+ fld_cache_insert(fld->lcf_cache, &res);
}
RETURN(rc);
}
void fld_client_flush(struct lu_client_fld *fld)
{
-#ifdef __KERNEL__
fld_cache_flush(fld->lcf_cache);
-#endif
}
EXPORT_SYMBOL(fld_client_flush);