1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fld/fld_handler.c
6 * Copyright (C) 2006 Cluster File Systems, Inc.
7 * Author: WangDi <wangdi@clusterfs.com>
8 * Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_FLD
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
56 /* XXX: maybe these 2 items should go to sbi */
57 struct fld_cache_info *fld_cache = NULL;
61 FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
62 FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
65 static __u32 fld_cache_hash(__u64 seq)
71 fld_cache_insert(struct fld_cache_info *fld_cache,
74 struct fld_cache *fld;
75 struct hlist_head *bucket;
76 struct hlist_node *scan;
80 bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
81 fld_cache->fld_hash_mask);
87 INIT_HLIST_NODE(&fld->fld_list);
91 spin_lock(&fld_cache->fld_lock);
92 hlist_for_each_entry(fld, scan, bucket, fld_list) {
93 if (fld->fld_seq == seq) {
94 spin_unlock(&fld_cache->fld_lock);
95 GOTO(exit, rc = -EEXIST);
98 hlist_add_head(&fld->fld_list, bucket);
99 spin_unlock(&fld_cache->fld_lock);
102 OBD_FREE(fld, sizeof(*fld));
106 static struct fld_cache *
107 fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
109 struct hlist_head *bucket;
110 struct hlist_node *scan;
111 struct fld_cache *fld;
114 bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
115 fld_cache->fld_hash_mask);
117 spin_lock(&fld_cache->fld_lock);
118 hlist_for_each_entry(fld, scan, bucket, fld_list) {
119 if (fld->fld_seq == seq) {
120 spin_unlock(&fld_cache->fld_lock);
124 spin_unlock(&fld_cache->fld_lock);
130 fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
132 struct hlist_head *bucket;
133 struct hlist_node *scan;
134 struct fld_cache *fld;
137 bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
138 fld_cache->fld_hash_mask);
140 spin_lock(&fld_cache->fld_lock);
141 hlist_for_each_entry(fld, scan, bucket, fld_list) {
142 if (fld->fld_seq == seq) {
143 hlist_del_init(&fld->fld_list);
150 spin_unlock(&fld_cache->fld_lock);
155 static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
157 return do_div(seq, fld->fld_count);
160 static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
162 CWARN("using Round Robin hash func for while\n");
163 return do_div(seq, fld->fld_count);
166 static struct lu_fld_hash fld_hash[3] = {
169 .fh_func = fld_dht_hash
172 .fh_name = "Round Robin",
173 .fh_func = fld_rrb_hash
180 static struct obd_export *
181 fld_client_get_exp(struct lu_client_fld *fld, __u64 seq)
183 struct obd_export *fld_exp;
187 hash = fld->fld_hash->fh_func(fld, seq);
189 spin_lock(&fld->fld_lock);
190 list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
195 spin_unlock(&fld->fld_lock);
200 /* add export to FLD. This is usually done by CMM and LMV as they are main users
202 int fld_client_add_export(struct lu_client_fld *fld,
203 struct obd_export *exp)
205 struct obd_export *fld_exp;
208 LASSERT(exp != NULL);
210 spin_lock(&fld->fld_lock);
211 list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
212 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
213 &exp->exp_client_uuid))
215 spin_unlock(&fld->fld_lock);
220 fld_exp = class_export_get(exp);
221 list_add_tail(&exp->exp_fld_chain,
225 spin_unlock(&fld->fld_lock);
229 EXPORT_SYMBOL(fld_client_add_export);
231 /* remove export from FLD */
232 int fld_client_del_export(struct lu_client_fld *fld,
233 struct obd_export *exp)
235 struct obd_export *fld_exp;
236 struct obd_export *tmp;
239 spin_lock(&fld->fld_lock);
240 list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
241 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
242 &exp->exp_client_uuid))
245 list_del(&fld_exp->exp_fld_chain);
246 class_export_get(fld_exp);
248 spin_unlock(&fld->fld_lock);
252 spin_unlock(&fld->fld_lock);
256 EXPORT_SYMBOL(fld_client_del_export);
258 int fld_client_init(struct lu_client_fld *fld, int hash)
263 LASSERT(fld != NULL);
265 if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
266 CERROR("wrong hash function 0x%x\n", hash);
270 INIT_LIST_HEAD(&fld->fld_exports);
271 spin_lock_init(&fld->fld_lock);
272 fld->fld_hash = &fld_hash[hash];
275 CDEBUG(D_INFO, "Client FLD initialized, using %s\n",
276 fld->fld_hash->fh_name);
279 EXPORT_SYMBOL(fld_client_init);
281 void fld_client_fini(struct lu_client_fld *fld)
283 struct obd_export *fld_exp;
284 struct obd_export *tmp;
287 spin_lock(&fld->fld_lock);
288 list_for_each_entry_safe(fld_exp, tmp,
289 &fld->fld_exports, exp_fld_chain) {
291 list_del(&fld_exp->exp_fld_chain);
292 class_export_get(fld_exp);
294 spin_unlock(&fld->fld_lock);
295 CDEBUG(D_INFO, "Client FLD finalized\n");
298 EXPORT_SYMBOL(fld_client_fini);
301 fld_client_rpc(struct obd_export *exp,
302 struct md_fld *mf, __u32 fld_op)
304 struct ptlrpc_request *req;
306 int mf_size = sizeof(*mf);
308 int size[2] = {sizeof(*op), mf_size}, rc;
311 req = ptlrpc_prep_req(class_exp2cliimp(exp),
312 LUSTRE_MDS_VERSION, FLD_QUERY,
317 op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
320 pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
321 memcpy(pmf, mf, sizeof(*mf));
323 req->rq_replen = lustre_msg_size(1, &mf_size);
324 req->rq_request_portal = MDS_FLD_PORTAL;
325 rc = ptlrpc_queue_wait(req);
329 pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
333 ptlrpc_req_finished(req);
338 fld_client_create(struct lu_client_fld *fld,
339 __u64 seq, __u64 mds)
342 struct obd_export *fld_exp;
343 struct md_fld md_fld;
347 fld_exp = fld_client_get_exp(fld, seq);
354 rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
357 fld_cache_insert(fld_cache, seq, mds);
364 EXPORT_SYMBOL(fld_client_create);
367 fld_client_delete(struct lu_client_fld *fld,
368 __u64 seq, __u64 mds)
370 struct obd_export *fld_exp;
371 struct md_fld md_fld;
375 fld_cache_delete(fld_cache, seq);
378 fld_exp = fld_client_get_exp(fld, seq);
385 rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
388 EXPORT_SYMBOL(fld_client_delete);
391 fld_client_get(struct lu_client_fld *fld,
392 __u64 seq, __u64 *mds)
394 struct obd_export *fld_exp;
395 struct md_fld md_fld;
398 fld_exp = fld_client_get_exp(fld, seq);
403 vallen = sizeof(struct md_fld);
405 rc = fld_client_rpc(fld_exp, &md_fld, FLD_GET);
407 *mds = md_fld.mf_mds;
412 /* lookup fid in the namespace of pfid according to the name */
414 fld_client_lookup(struct lu_client_fld *fld,
415 __u64 seq, __u64 *mds)
419 struct fld_cache *fld_entry;
425 /* lookup it in the cache */
426 fld_entry = fld_cache_lookup(fld_cache, seq);
427 if (fld_entry != NULL) {
428 *mds = fld_entry->fld_mds;
433 /* can not find it in the cache */
434 rc = fld_client_get(fld, seq, mds);
439 rc = fld_cache_insert(fld_cache, seq, *mds);
447 EXPORT_SYMBOL(fld_client_lookup);
450 static int fld_init(void)
454 OBD_ALLOC_PTR(fld_cache);
455 if (fld_cache == NULL)
458 /* init fld cache info */
459 fld_cache->fld_hash_mask = FLD_HTABLE_MASK;
460 OBD_ALLOC(fld_cache->fld_hash, FLD_HTABLE_SIZE *
461 sizeof fld_cache->fld_hash[0]);
462 spin_lock_init(&fld_cache->fld_lock);
464 CDEBUG(D_INFO, "Client FLD, cache size %d\n",
470 static int fld_fini(void)
472 if (fld_cache != NULL) {
473 OBD_FREE(fld_cache->fld_hash, FLD_HTABLE_SIZE *
474 sizeof fld_cache->fld_hash[0]);
475 OBD_FREE_PTR(fld_cache);
480 static int __init fld_mod_init(void)
486 static void __exit fld_mod_exit(void)
493 static struct fld_list fld_list_head;
496 fld_server_handle(struct lu_server_fld *fld,
497 const struct lu_context *ctx,
498 __u32 opts, struct md_fld *mf)
505 rc = fld_handle_insert(fld, ctx, mf->mf_seq, mf->mf_mds);
508 rc = fld_handle_delete(fld, ctx, mf->mf_seq);
511 rc = fld_handle_lookup(fld, ctx, mf->mf_seq, &mf->mf_mds);
522 fld_req_handle0(const struct lu_context *ctx,
523 struct lu_server_fld *fld,
524 struct ptlrpc_request *req)
526 int rep_buf_size[3] = { 0, };
527 struct req_capsule pill;
534 req_capsule_init(&pill, req, RCL_SERVER,
537 req_capsule_set(&pill, &RQF_FLD_QUERY);
538 req_capsule_pack(&pill);
540 opc = req_capsule_client_get(&pill, &RMF_FLD_OPC);
542 in = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
544 CERROR("cannot unpack fld request\n");
545 GOTO(out_pill, rc = -EPROTO);
547 out = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
549 CERROR("cannot allocate fld response\n");
550 GOTO(out_pill, rc = -EPROTO);
553 rc = fld_server_handle(fld, ctx, *opc, out);
555 CERROR("cannot unpack FLD operation\n");
560 req_capsule_fini(&pill);
565 static int fld_req_handle(struct ptlrpc_request *req)
567 int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
568 const struct lu_context *ctx;
569 struct lu_site *site;
573 OBD_FAIL_RETURN(OBD_FAIL_FLD_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
575 ctx = req->rq_svc_thread->t_ctx;
576 LASSERT(ctx != NULL);
577 LASSERT(ctx->lc_thread == req->rq_svc_thread);
578 if (req->rq_reqmsg->opc == FLD_QUERY) {
579 if (req->rq_export != NULL) {
580 site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
581 LASSERT(site != NULL);
582 rc = fld_req_handle0(ctx, site->ls_fld, req);
584 CERROR("Unconnected request\n");
585 req->rq_status = -ENOTCONN;
586 GOTO(out, rc = -ENOTCONN);
589 CERROR("Wrong opcode: %d\n", req->rq_reqmsg->opc);
590 req->rq_status = -ENOTSUPP;
591 rc = ptlrpc_error(req);
597 target_send_reply(req, rc, fail);
602 fld_server_init(struct lu_server_fld *fld,
603 const struct lu_context *ctx,
604 struct dt_device *dt)
607 struct ptlrpc_service_conf fld_conf = {
608 .psc_nbufs = MDS_NBUFS,
609 .psc_bufsize = MDS_BUFSIZE,
610 .psc_max_req_size = MDS_MAXREQSIZE,
611 .psc_max_reply_size = MDS_MAXREPSIZE,
612 .psc_req_portal = MDS_FLD_PORTAL,
613 .psc_rep_portal = MDC_REPLY_PORTAL,
614 .psc_watchdog_timeout = FLD_SERVICE_WATCHDOG_TIMEOUT,
615 .psc_num_threads = FLD_NUM_THREADS
620 lu_device_get(&dt->dd_lu_dev);
621 INIT_LIST_HEAD(&fld_list_head.fld_list);
622 spin_lock_init(&fld_list_head.fld_lock);
624 rc = fld_iam_init(fld, ctx);
628 ptlrpc_init_svc_conf(&fld_conf, fld_req_handle,
630 fld->fld_proc_entry, NULL);
631 if (fld->fld_service != NULL)
632 rc = ptlrpc_start_threads(NULL, fld->fld_service,
639 fld_server_fini(fld, ctx);
641 CDEBUG(D_INFO, "Server FLD initialized\n");
644 EXPORT_SYMBOL(fld_server_init);
647 fld_server_fini(struct lu_server_fld *fld,
648 const struct lu_context *ctx)
650 struct list_head *pos, *n;
653 if (fld->fld_service != NULL) {
654 ptlrpc_unregister_service(fld->fld_service);
655 fld->fld_service = NULL;
658 spin_lock(&fld_list_head.fld_lock);
659 list_for_each_safe(pos, n, &fld_list_head.fld_list) {
660 struct fld_item *fld = list_entry(pos, struct fld_item,
662 list_del_init(&fld->fld_list);
665 spin_unlock(&fld_list_head.fld_lock);
666 if (fld->fld_dt != NULL) {
667 lu_device_put(&fld->fld_dt->dd_lu_dev);
668 fld_iam_fini(fld, ctx);
671 CDEBUG(D_INFO, "Server FLD finalized\n");
674 EXPORT_SYMBOL(fld_server_fini);
676 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
677 MODULE_DESCRIPTION("Lustre FLD");
678 MODULE_LICENSE("GPL");
680 cfs_module(mdd, "0.0.4", fld_mod_init, fld_mod_exit);