1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fld/fld_request.c
5 * FLD (Fids Location Database)
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_FLD
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include <lustre_mdc.h>
54 #include "fld_internal.h"
56 static int fld_rrb_hash(struct lu_client_fld *fld,
59 LASSERT(fld->lcf_count > 0);
60 return do_div(seq, fld->lcf_count);
63 static struct lu_fld_target *
64 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
66 struct lu_fld_target *target;
70 hash = fld_rrb_hash(fld, seq);
72 list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
73 if (target->ft_idx == hash)
77 CERROR("%s: Can't find target by hash %d (seq "LPX64"). "
78 "Targets (%d):\n", fld->lcf_name, hash, seq,
81 list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
82 const char *srv_name = target->ft_srv != NULL ?
83 target->ft_srv->lsf_name : "<null>";
84 const char *exp_name = target->ft_exp != NULL ?
85 (char *)target->ft_exp->exp_obd->obd_uuid.uuid :
88 CERROR(" exp: 0x%p (%s), srv: 0x%p (%s), idx: "LPU64"\n",
89 target->ft_exp, exp_name, target->ft_srv,
90 srv_name, target->ft_idx);
94 * If target is not found, there is logical error anyway, so here is
95 * LBUG() to catch this situation.
101 static int fld_dht_hash(struct lu_client_fld *fld,
104 /* XXX: here should be DHT hash */
105 return fld_rrb_hash(fld, seq);
108 static struct lu_fld_target *
109 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
111 /* XXX: here should be DHT scan code */
112 return fld_rrb_scan(fld, seq);
115 struct lu_fld_hash fld_hash[3] = {
118 .fh_hash_func = fld_dht_hash,
119 .fh_scan_func = fld_dht_scan
123 .fh_hash_func = fld_rrb_hash,
124 .fh_scan_func = fld_rrb_scan
131 static struct lu_fld_target *
132 fld_client_get_target(struct lu_client_fld *fld,
135 struct lu_fld_target *target;
138 LASSERT(fld->lcf_hash != NULL);
140 spin_lock(&fld->lcf_lock);
141 target = fld->lcf_hash->fh_scan_func(fld, seq);
142 spin_unlock(&fld->lcf_lock);
144 if (target != NULL) {
145 CDEBUG(D_INFO, "%s: Found target (idx "LPU64
146 ") by seq "LPX64"\n", fld->lcf_name,
147 target->ft_idx, seq);
154 * Add export to FLD. This is usually done by CMM and LMV as they are main users
157 int fld_client_add_target(struct lu_client_fld *fld,
158 struct lu_fld_target *tar)
160 const char *name = fld_target_name(tar);
161 struct lu_fld_target *target, *tmp;
164 LASSERT(tar != NULL);
165 LASSERT(name != NULL);
166 LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
168 if (fld->lcf_flags != LUSTRE_FLD_INIT) {
169 CERROR("%s: Attempt to add target %s (idx "LPU64") "
170 "on fly - skip it\n", fld->lcf_name, name,
174 CDEBUG(D_INFO, "%s: Adding target %s (idx "
175 LPU64")\n", fld->lcf_name, name, tar->ft_idx);
178 OBD_ALLOC_PTR(target);
182 spin_lock(&fld->lcf_lock);
183 list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
184 if (tmp->ft_idx == tar->ft_idx) {
185 spin_unlock(&fld->lcf_lock);
186 OBD_FREE_PTR(target);
187 CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
188 name, fld_target_name(tmp), tmp->ft_idx);
193 target->ft_exp = tar->ft_exp;
194 if (target->ft_exp != NULL)
195 class_export_get(target->ft_exp);
196 target->ft_srv = tar->ft_srv;
197 target->ft_idx = tar->ft_idx;
199 list_add_tail(&target->ft_chain,
203 spin_unlock(&fld->lcf_lock);
207 EXPORT_SYMBOL(fld_client_add_target);
209 /* Remove export from FLD */
210 int fld_client_del_target(struct lu_client_fld *fld,
213 struct lu_fld_target *target, *tmp;
216 spin_lock(&fld->lcf_lock);
217 list_for_each_entry_safe(target, tmp,
218 &fld->lcf_targets, ft_chain) {
219 if (target->ft_idx == idx) {
221 list_del(&target->ft_chain);
222 spin_unlock(&fld->lcf_lock);
224 if (target->ft_exp != NULL)
225 class_export_put(target->ft_exp);
227 OBD_FREE_PTR(target);
231 spin_unlock(&fld->lcf_lock);
234 EXPORT_SYMBOL(fld_client_del_target);
236 static void fld_client_proc_fini(struct lu_client_fld *fld);
239 static int fld_client_proc_init(struct lu_client_fld *fld)
244 fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
248 if (IS_ERR(fld->lcf_proc_dir)) {
249 CERROR("%s: LProcFS failed in fld-init\n",
251 rc = PTR_ERR(fld->lcf_proc_dir);
255 rc = lprocfs_add_vars(fld->lcf_proc_dir,
256 fld_client_proc_list, fld);
258 CERROR("%s: Can't init FLD proc, rc %d\n",
260 GOTO(out_cleanup, rc);
266 fld_client_proc_fini(fld);
270 static void fld_client_proc_fini(struct lu_client_fld *fld)
273 if (fld->lcf_proc_dir) {
274 if (!IS_ERR(fld->lcf_proc_dir))
275 lprocfs_remove(fld->lcf_proc_dir);
276 fld->lcf_proc_dir = NULL;
281 static int fld_client_proc_init(struct lu_client_fld *fld)
286 static void fld_client_proc_fini(struct lu_client_fld *fld)
292 static inline int hash_is_sane(int hash)
294 return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
297 int fld_client_init(struct lu_client_fld *fld,
298 const char *prefix, int hash)
301 int cache_size, cache_threshold;
306 LASSERT(fld != NULL);
308 snprintf(fld->lcf_name, sizeof(fld->lcf_name),
311 if (!hash_is_sane(hash)) {
312 CERROR("%s: Wrong hash function %#x\n",
313 fld->lcf_name, hash);
318 spin_lock_init(&fld->lcf_lock);
319 fld->lcf_hash = &fld_hash[hash];
320 fld->lcf_flags = LUSTRE_FLD_INIT;
321 INIT_LIST_HEAD(&fld->lcf_targets);
324 cache_size = FLD_CLIENT_CACHE_SIZE /
325 sizeof(struct fld_cache_entry);
327 cache_threshold = cache_size *
328 FLD_CLIENT_CACHE_THRESHOLD / 100;
330 fld->lcf_cache = fld_cache_init(fld->lcf_name,
331 FLD_CLIENT_HTABLE_SIZE,
332 cache_size, cache_threshold);
333 if (IS_ERR(fld->lcf_cache)) {
334 rc = PTR_ERR(fld->lcf_cache);
335 fld->lcf_cache = NULL;
340 rc = fld_client_proc_init(fld);
346 fld_client_fini(fld);
348 CDEBUG(D_INFO, "%s: Using \"%s\" hash\n",
349 fld->lcf_name, fld->lcf_hash->fh_name);
352 EXPORT_SYMBOL(fld_client_init);
354 void fld_client_fini(struct lu_client_fld *fld)
356 struct lu_fld_target *target, *tmp;
359 fld_client_proc_fini(fld);
361 spin_lock(&fld->lcf_lock);
362 list_for_each_entry_safe(target, tmp,
363 &fld->lcf_targets, ft_chain) {
365 list_del(&target->ft_chain);
366 if (target->ft_exp != NULL)
367 class_export_put(target->ft_exp);
368 OBD_FREE_PTR(target);
370 spin_unlock(&fld->lcf_lock);
373 if (fld->lcf_cache != NULL) {
374 if (!IS_ERR(fld->lcf_cache))
375 fld_cache_fini(fld->lcf_cache);
376 fld->lcf_cache = NULL;
382 EXPORT_SYMBOL(fld_client_fini);
384 static int fld_client_rpc(struct obd_export *exp,
385 struct md_fld *mf, __u32 fld_op)
387 int size[3] = { sizeof(struct ptlrpc_body),
389 sizeof(struct md_fld) };
390 struct ptlrpc_request *req;
391 struct req_capsule pill;
397 LASSERT(exp != NULL);
399 req = ptlrpc_prep_req(class_exp2cliimp(exp),
406 req_capsule_init(&pill, req, RCL_CLIENT, NULL);
407 req_capsule_set(&pill, &RQF_FLD_QUERY);
409 op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
412 pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
415 size[1] = sizeof(struct md_fld);
416 ptlrpc_req_set_repsize(req, 2, size);
417 req->rq_request_portal = FLD_REQUEST_PORTAL;
419 if (fld_op != FLD_LOOKUP)
420 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
421 rc = ptlrpc_queue_wait(req);
422 if (fld_op != FLD_LOOKUP)
423 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
427 pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
429 GOTO(out_req, rc = -EFAULT);
433 req_capsule_fini(&pill);
434 ptlrpc_req_finished(req);
438 int fld_client_create(struct lu_client_fld *fld,
439 seqno_t seq, mdsno_t mds,
440 const struct lu_env *env)
442 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
443 struct lu_fld_target *target;
447 fld->lcf_flags |= LUSTRE_FLD_RUN;
448 target = fld_client_get_target(fld, seq);
449 LASSERT(target != NULL);
451 CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: "
452 LPU64") on target %s (idx "LPU64")\n", fld->lcf_name,
453 seq, mds, fld_target_name(target), target->ft_idx);
456 if (target->ft_srv != NULL) {
457 LASSERT(env != NULL);
458 rc = fld_server_create(target->ft_srv,
462 rc = fld_client_rpc(target->ft_exp,
463 &md_fld, FLD_CREATE);
470 * Do not return result of calling fld_cache_insert()
471 * here. First of all because it may return -EEXISTS. Another
472 * reason is that, we do not want to stop proceeding because of
475 fld_cache_insert(fld->lcf_cache, seq, mds);
477 CERROR("%s: Can't create FLD entry, rc %d\n",
483 EXPORT_SYMBOL(fld_client_create);
485 int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
486 const struct lu_env *env)
488 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
489 struct lu_fld_target *target;
493 fld->lcf_flags |= LUSTRE_FLD_RUN;
494 fld_cache_delete(fld->lcf_cache, seq);
496 target = fld_client_get_target(fld, seq);
497 LASSERT(target != NULL);
499 CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on "
500 "target %s (idx "LPU64")\n", fld->lcf_name, seq,
501 fld_target_name(target), target->ft_idx);
504 if (target->ft_srv != NULL) {
505 LASSERT(env != NULL);
506 rc = fld_server_delete(target->ft_srv,
510 rc = fld_client_rpc(target->ft_exp,
511 &md_fld, FLD_DELETE);
518 EXPORT_SYMBOL(fld_client_delete);
520 int fld_client_lookup(struct lu_client_fld *fld,
521 seqno_t seq, mdsno_t *mds,
522 const struct lu_env *env)
524 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
525 struct lu_fld_target *target;
529 fld->lcf_flags |= LUSTRE_FLD_RUN;
531 rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
535 /* Can not find it in the cache */
536 target = fld_client_get_target(fld, seq);
537 LASSERT(target != NULL);
539 CDEBUG(D_INFO, "%s: Lookup fld entry (seq: "LPX64") on "
540 "target %s (idx "LPU64")\n", fld->lcf_name, seq,
541 fld_target_name(target), target->ft_idx);
544 if (target->ft_srv != NULL) {
545 LASSERT(env != NULL);
546 rc = fld_server_lookup(target->ft_srv,
547 env, seq, &md_fld.mf_mds);
550 rc = fld_client_rpc(target->ft_exp,
551 &md_fld, FLD_LOOKUP);
556 *mds = md_fld.mf_mds;
559 * Do not return error here as well. See previous comment in
560 * same situation in function fld_client_create().
562 fld_cache_insert(fld->lcf_cache, seq, *mds);
566 EXPORT_SYMBOL(fld_client_lookup);