1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fld/fld_request.c
5 * FLD (Fids Location Database)
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_FLD
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
55 static int fld_rrb_hash(struct lu_client_fld *fld,
58 LASSERT(fld->lcf_count > 0);
59 return do_div(seq, fld->lcf_count);
62 static struct lu_fld_target *
63 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
65 struct lu_fld_target *target;
69 hash = fld_rrb_hash(fld, seq);
71 list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
72 if (target->ft_idx == hash)
76 /* if target is not found, there is logical error anyway, so here is
77 * LBUG() to catch this situation. */
82 static int fld_dht_hash(struct lu_client_fld *fld,
85 /* XXX: here should be DHT hash */
86 return fld_rrb_hash(fld, seq);
89 static struct lu_fld_target *
90 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
92 /* XXX: here should be DHT scan code */
93 return fld_rrb_scan(fld, seq);
96 struct lu_fld_hash fld_hash[3] = {
99 .fh_hash_func = fld_dht_hash,
100 .fh_scan_func = fld_dht_scan
104 .fh_hash_func = fld_rrb_hash,
105 .fh_scan_func = fld_rrb_scan
112 static struct lu_fld_target *
113 fld_client_get_target(struct lu_client_fld *fld,
116 struct lu_fld_target *target;
119 LASSERT(fld->lcf_hash != NULL);
121 spin_lock(&fld->lcf_lock);
122 target = fld->lcf_hash->fh_scan_func(fld, seq);
123 spin_unlock(&fld->lcf_lock);
129 * Add export to FLD. This is usually done by CMM and LMV as they are main users
132 int fld_client_add_target(struct lu_client_fld *fld,
133 struct lu_fld_target *tar)
135 const char *tar_name = fld_target_name(tar);
136 struct lu_fld_target *target, *tmp;
139 LASSERT(tar != NULL);
140 LASSERT(tar_name != NULL);
141 LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
143 CDEBUG(D_INFO|D_WARNING, "%s: adding target %s\n",
144 fld->lcf_name, tar_name);
146 OBD_ALLOC_PTR(target);
150 spin_lock(&fld->lcf_lock);
151 list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
152 const char *tmp_name = fld_target_name(tmp);
154 if (strlen(tar_name) == strlen(tmp_name) &&
155 strcmp(tmp_name, tar_name) == 0)
157 spin_unlock(&fld->lcf_lock);
158 OBD_FREE_PTR(target);
163 target->ft_exp = tar->ft_exp;
164 target->ft_srv = tar->ft_srv;
165 target->ft_idx = tar->ft_idx;
167 list_add_tail(&target->ft_chain,
171 spin_unlock(&fld->lcf_lock);
175 EXPORT_SYMBOL(fld_client_add_target);
177 /* remove export from FLD */
178 int fld_client_del_target(struct lu_client_fld *fld,
181 struct lu_fld_target *target, *tmp;
184 spin_lock(&fld->lcf_lock);
185 list_for_each_entry_safe(target, tmp,
186 &fld->lcf_targets, ft_chain) {
187 if (target->ft_idx == idx) {
189 list_del(&target->ft_chain);
190 spin_unlock(&fld->lcf_lock);
191 class_export_put(target->ft_exp);
192 OBD_FREE_PTR(target);
196 spin_unlock(&fld->lcf_lock);
199 EXPORT_SYMBOL(fld_client_del_target);
201 static void fld_client_proc_fini(struct lu_client_fld *fld);
204 static int fld_client_proc_init(struct lu_client_fld *fld)
209 fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
213 if (IS_ERR(fld->lcf_proc_dir)) {
214 CERROR("LProcFS failed in fld-init\n");
215 rc = PTR_ERR(fld->lcf_proc_dir);
219 rc = lprocfs_add_vars(fld->lcf_proc_dir,
220 fld_client_proc_list, fld);
222 CERROR("can't init FLD "
223 "proc, rc %d\n", rc);
224 GOTO(out_cleanup, rc);
230 fld_client_proc_fini(fld);
234 static void fld_client_proc_fini(struct lu_client_fld *fld)
237 if (fld->lcf_proc_dir) {
238 if (!IS_ERR(fld->lcf_proc_dir))
239 lprocfs_remove(fld->lcf_proc_dir);
240 fld->lcf_proc_dir = NULL;
245 static int fld_client_proc_init(struct lu_client_fld *fld)
250 static void fld_client_proc_fini(struct lu_client_fld *fld)
256 static inline int hash_is_sane(int hash)
258 return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
261 /* 1M of FLD cache will not hurt client a lot */
262 #define FLD_CACHE_SIZE 1024000
264 /* cache threshold is 10 percent of size */
265 #define FLD_CACHE_THRESHOLD 10
267 int fld_client_init(struct lu_client_fld *fld,
268 const char *prefix, int hash,
269 const struct lu_context *ctx)
272 int cache_size, cache_threshold;
277 LASSERT(fld != NULL);
279 if (!hash_is_sane(hash)) {
280 CERROR("Wrong hash function %#x\n", hash);
286 spin_lock_init(&fld->lcf_lock);
287 fld->lcf_hash = &fld_hash[hash];
288 INIT_LIST_HEAD(&fld->lcf_targets);
290 snprintf(fld->lcf_name, sizeof(fld->lcf_name),
291 "%s-cli-%s", LUSTRE_FLD_NAME, prefix);
294 cache_size = FLD_CACHE_SIZE /
295 sizeof(struct fld_cache_entry);
297 cache_threshold = cache_size *
298 FLD_CACHE_THRESHOLD / 100;
300 fld->lcf_cache = fld_cache_init(FLD_HTABLE_SIZE,
303 if (IS_ERR(fld->lcf_cache)) {
304 rc = PTR_ERR(fld->lcf_cache);
305 fld->lcf_cache = NULL;
310 rc = fld_client_proc_init(fld);
316 fld_client_fini(fld);
318 CDEBUG(D_INFO|D_WARNING,
319 "Client FLD, using \"%s\" hash\n",
320 fld->lcf_hash->fh_name);
323 EXPORT_SYMBOL(fld_client_init);
325 void fld_client_fini(struct lu_client_fld *fld)
327 struct lu_fld_target *target, *tmp;
330 fld_client_proc_fini(fld);
332 spin_lock(&fld->lcf_lock);
333 list_for_each_entry_safe(target, tmp,
334 &fld->lcf_targets, ft_chain) {
336 list_del(&target->ft_chain);
337 if (target->ft_exp != NULL)
338 class_export_put(target->ft_exp);
339 OBD_FREE_PTR(target);
341 spin_unlock(&fld->lcf_lock);
344 if (fld->lcf_cache != NULL) {
345 if (!IS_ERR(fld->lcf_cache))
346 fld_cache_fini(fld->lcf_cache);
347 fld->lcf_cache = NULL;
351 CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
354 EXPORT_SYMBOL(fld_client_fini);
356 static int fld_client_rpc(struct obd_export *exp,
357 struct md_fld *mf, __u32 fld_op)
359 int size[3] = { sizeof(struct ptlrpc_body),
361 sizeof(struct md_fld) };
362 struct ptlrpc_request *req;
363 struct req_capsule pill;
369 LASSERT(exp != NULL);
371 req = ptlrpc_prep_req(class_exp2cliimp(exp),
372 LUSTRE_MDS_VERSION, FLD_QUERY,
377 req_capsule_init(&pill, req, RCL_CLIENT, NULL);
379 req_capsule_set(&pill, &RQF_FLD_QUERY);
381 op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
384 pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
387 size[1] = sizeof(struct md_fld);
388 ptlrpc_req_set_repsize(req, 2, size);
389 req->rq_request_portal = FLD_REQUEST_PORTAL;
391 rc = ptlrpc_queue_wait(req);
395 pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
397 GOTO(out_req, rc = -EFAULT);
401 req_capsule_fini(&pill);
402 ptlrpc_req_finished(req);
406 int fld_client_create(struct lu_client_fld *fld,
407 seqno_t seq, mdsno_t mds)
409 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
410 struct lu_fld_target *target;
414 target = fld_client_get_target(fld, seq);
415 LASSERT(target != NULL);
418 if (target->ft_srv != NULL) {
419 LASSERT(fld->lcf_ctx != NULL);
420 rc = fld_server_create(target->ft_srv,
425 rc = fld_client_rpc(target->ft_exp,
426 &md_fld, FLD_CREATE);
433 * Do not return result of calling fld_cache_insert()
434 * here. First of all because it may return -EEXISTS. Another
435 * reason is that, we do not want to stop proceeding because of
436 * cache errors. --umka
438 fld_cache_insert(fld->lcf_cache, seq, mds);
442 EXPORT_SYMBOL(fld_client_create);
444 int fld_client_delete(struct lu_client_fld *fld,
447 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
448 struct lu_fld_target *target;
452 fld_cache_delete(fld->lcf_cache, seq);
454 target = fld_client_get_target(fld, seq);
455 LASSERT(target != NULL);
458 if (target->ft_srv != NULL) {
459 LASSERT(fld->lcf_ctx != NULL);
460 rc = fld_server_delete(target->ft_srv,
465 rc = fld_client_rpc(target->ft_exp,
466 &md_fld, FLD_DELETE);
473 EXPORT_SYMBOL(fld_client_delete);
475 int fld_client_lookup(struct lu_client_fld *fld,
476 seqno_t seq, mdsno_t *mds)
478 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
479 struct lu_fld_target *target;
483 /* lookup it in the cache */
484 rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
488 /* can not find it in the cache */
489 target = fld_client_get_target(fld, seq);
490 LASSERT(target != NULL);
493 if (target->ft_srv != NULL) {
494 LASSERT(fld->lcf_ctx != NULL);
495 rc = fld_server_lookup(target->ft_srv,
500 rc = fld_client_rpc(target->ft_exp,
501 &md_fld, FLD_LOOKUP);
506 *mds = md_fld.mf_mds;
509 * Do not return error here as well. See previous comment in
510 * same situation in function fld_client_create(). --umka
512 fld_cache_insert(fld->lcf_cache, seq, *mds);
516 EXPORT_SYMBOL(fld_client_lookup);