1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fld/fld_request.c
5 * FLD (Fids Location Database)
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_FLD
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
55 static int fld_rrb_hash(struct lu_client_fld *fld,
58 LASSERT(fld->fld_count > 0);
59 return do_div(seq, fld->fld_count);
62 static struct fld_target *
63 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
65 struct fld_target *target;
69 hash = fld_rrb_hash(fld, seq);
71 list_for_each_entry(target, &fld->fld_targets, fldt_chain) {
72 if (target->fldt_idx == hash)
76 /* if target is not found, there is logical error anyway, so here is
77 * LBUG() to catch this situation. */
82 static int fld_dht_hash(struct lu_client_fld *fld,
85 /* XXX: here should be DHT hash */
86 return fld_rrb_hash(fld, seq);
89 static struct fld_target *
90 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
92 /* XXX: here should be DHT scan code */
93 return fld_dht_scan(fld, seq);
96 struct lu_fld_hash fld_hash[3] = {
99 .fh_hash_func = fld_dht_hash,
100 .fh_scan_func = fld_dht_scan
104 .fh_hash_func = fld_rrb_hash,
105 .fh_scan_func = fld_rrb_scan
112 static struct fld_target *
113 fld_client_get_target(struct lu_client_fld *fld,
116 struct fld_target *target;
119 LASSERT(fld->fld_hash != NULL);
121 spin_lock(&fld->fld_lock);
122 target = fld->fld_hash->fh_scan_func(fld, seq);
123 spin_unlock(&fld->fld_lock);
129 * Add export to FLD. This is usually done by CMM and LMV as they are main users
132 int fld_client_add_target(struct lu_client_fld *fld,
133 struct obd_export *exp)
135 struct client_obd *cli = &exp->exp_obd->u.cli;
136 struct fld_target *target, *tmp;
139 LASSERT(exp != NULL);
141 CDEBUG(D_INFO|D_WARNING, "%s: adding export %s\n",
142 fld->fld_name, cli->cl_target_uuid.uuid);
144 OBD_ALLOC_PTR(target);
148 spin_lock(&fld->fld_lock);
149 list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
150 if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
151 &exp->exp_client_uuid))
153 spin_unlock(&fld->fld_lock);
154 OBD_FREE_PTR(target);
159 target->fldt_exp = class_export_get(exp);
160 target->fldt_idx = fld->fld_count;
162 list_add_tail(&target->fldt_chain,
165 spin_unlock(&fld->fld_lock);
169 EXPORT_SYMBOL(fld_client_add_target);
171 /* remove export from FLD */
172 int fld_client_del_target(struct lu_client_fld *fld,
173 struct obd_export *exp)
175 struct fld_target *target, *tmp;
178 spin_lock(&fld->fld_lock);
179 list_for_each_entry_safe(target, tmp,
180 &fld->fld_targets, fldt_chain) {
181 if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
182 &exp->exp_client_uuid))
185 list_del(&target->fldt_chain);
186 spin_unlock(&fld->fld_lock);
187 class_export_put(target->fldt_exp);
188 OBD_FREE_PTR(target);
192 spin_unlock(&fld->fld_lock);
195 EXPORT_SYMBOL(fld_client_del_target);
197 static void fld_client_proc_fini(struct lu_client_fld *fld);
200 static int fld_client_proc_init(struct lu_client_fld *fld)
205 fld->fld_proc_dir = lprocfs_register(fld->fld_name,
209 if (IS_ERR(fld->fld_proc_dir)) {
210 CERROR("LProcFS failed in fld-init\n");
211 rc = PTR_ERR(fld->fld_proc_dir);
215 rc = lprocfs_add_vars(fld->fld_proc_dir,
216 fld_client_proc_list, fld);
218 CERROR("can't init FLD "
219 "proc, rc %d\n", rc);
220 GOTO(out_cleanup, rc);
226 fld_client_proc_fini(fld);
230 static void fld_client_proc_fini(struct lu_client_fld *fld)
233 if (fld->fld_proc_dir) {
234 if (!IS_ERR(fld->fld_proc_dir))
235 lprocfs_remove(fld->fld_proc_dir);
236 fld->fld_proc_dir = NULL;
241 static int fld_client_proc_init(struct lu_client_fld *fld)
246 static void fld_client_proc_fini(struct lu_client_fld *fld)
252 static inline int hash_is_sane(int hash)
254 return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
257 /* 1M of FLD cache will not hurt client a lot */
258 #define FLD_CACHE_SIZE 1024000
260 /* cache threshold is 10 percent of size */
261 #define FLD_CACHE_THRESHOLD 10
263 int fld_client_init(struct lu_client_fld *fld,
264 const char *uuid, int hash)
267 int cache_size, cache_threshold;
272 LASSERT(fld != NULL);
274 if (!hash_is_sane(hash)) {
275 CERROR("wrong hash function %#x\n", hash);
279 INIT_LIST_HEAD(&fld->fld_targets);
280 spin_lock_init(&fld->fld_lock);
281 fld->fld_hash = &fld_hash[hash];
284 snprintf(fld->fld_name, sizeof(fld->fld_name),
285 "%s-cli-%s", LUSTRE_FLD_NAME, uuid);
288 cache_size = FLD_CACHE_SIZE /
289 sizeof(struct fld_cache_entry);
291 cache_threshold = cache_size *
292 FLD_CACHE_THRESHOLD / 100;
294 fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE,
297 if (IS_ERR(fld->fld_cache)) {
298 rc = PTR_ERR(fld->fld_cache);
299 fld->fld_cache = NULL;
304 rc = fld_client_proc_init(fld);
310 fld_client_fini(fld);
312 CDEBUG(D_INFO|D_WARNING,
313 "Client FLD, using \"%s\" hash\n",
314 fld->fld_hash->fh_name);
317 EXPORT_SYMBOL(fld_client_init);
319 void fld_client_fini(struct lu_client_fld *fld)
321 struct fld_target *target, *tmp;
324 fld_client_proc_fini(fld);
326 spin_lock(&fld->fld_lock);
327 list_for_each_entry_safe(target, tmp,
328 &fld->fld_targets, fldt_chain) {
330 list_del(&target->fldt_chain);
331 class_export_put(target->fldt_exp);
332 OBD_FREE_PTR(target);
334 spin_unlock(&fld->fld_lock);
337 if (fld->fld_cache != NULL) {
338 if (!IS_ERR(fld->fld_cache))
339 fld_cache_fini(fld->fld_cache);
340 fld->fld_cache = NULL;
344 CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
347 EXPORT_SYMBOL(fld_client_fini);
349 static int fld_client_rpc(struct obd_export *exp,
350 struct md_fld *mf, __u32 fld_op)
352 int reqsize[3] = { sizeof(struct ptlrpc_body),
354 sizeof(struct md_fld) };
355 int repsize[2] = { sizeof(struct ptlrpc_body),
356 sizeof(struct md_fld) };
357 struct ptlrpc_request *req;
358 struct req_capsule pill;
364 LASSERT(exp != NULL);
366 req = ptlrpc_prep_req(class_exp2cliimp(exp),
367 LUSTRE_MDS_VERSION, FLD_QUERY,
372 req_capsule_init(&pill, req, RCL_CLIENT, NULL);
374 req_capsule_set(&pill, &RQF_FLD_QUERY);
376 op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
379 pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
382 ptlrpc_req_set_repsize(req, 2, repsize);
383 req->rq_request_portal = FLD_REQUEST_PORTAL;
385 rc = ptlrpc_queue_wait(req);
389 pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
391 GOTO(out_req, rc = -EFAULT);
395 req_capsule_fini(&pill);
396 ptlrpc_req_finished(req);
400 int fld_client_create(struct lu_client_fld *fld,
401 seqno_t seq, mdsno_t mds)
403 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
404 struct fld_target *target;
408 target = fld_client_get_target(fld, seq);
409 LASSERT(target != NULL);
411 rc = fld_client_rpc(target->fldt_exp, &md_fld, FLD_CREATE);
415 * Do not return result of calling fld_cache_insert()
416 * here. First of all because it may return -EEXISTS. Another
417 * reason is that, we do not want to stop proceeding because of
418 * cache errors. --umka
420 fld_cache_insert(fld->fld_cache, seq, mds);
424 EXPORT_SYMBOL(fld_client_create);
426 int fld_client_delete(struct lu_client_fld *fld,
429 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
430 struct fld_target *target;
434 fld_cache_delete(fld->fld_cache, seq);
436 target = fld_client_get_target(fld, seq);
437 LASSERT(target != NULL);
439 rc = fld_client_rpc(target->fldt_exp,
440 &md_fld, FLD_DELETE);
444 EXPORT_SYMBOL(fld_client_delete);
446 int fld_client_lookup(struct lu_client_fld *fld,
447 seqno_t seq, mdsno_t *mds)
449 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
450 struct fld_target *target;
454 /* lookup it in the cache */
455 rc = fld_cache_lookup(fld->fld_cache, seq, mds);
459 /* can not find it in the cache */
460 target = fld_client_get_target(fld, seq);
461 LASSERT(target != NULL);
463 rc = fld_client_rpc(target->fldt_exp,
464 &md_fld, FLD_LOOKUP);
466 *mds = md_fld.mf_mds;
469 * Do not return error here as well. See previous comment in
470 * same situation in function fld_client_create(). --umka
472 fld_cache_insert(fld->fld_cache, seq, *mds);
476 EXPORT_SYMBOL(fld_client_lookup);