1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fld/fld_handler.c
5 * FLD (Fids Location Database)
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_FLD
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
57 fld_cache_hash(__u64 seq)
63 fld_cache_insert(struct fld_cache_info *fld_cache,
66 struct fld_cache_entry *flde, *fldt;
67 struct hlist_head *bucket;
68 struct hlist_node *scan;
76 bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
77 fld_cache->fld_hash_mask);
79 spin_lock(&fld_cache->fld_lock);
80 hlist_for_each_entry(fldt, scan, bucket, fld_list) {
81 if (fldt->fld_seq == seq)
82 GOTO(exit_unlock, rc = -EEXIST);
85 INIT_HLIST_NODE(&flde->fld_list);
89 hlist_add_head(&flde->fld_list, bucket);
93 spin_unlock(&fld_cache->fld_lock);
100 fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
102 struct fld_cache_entry *flde;
103 struct hlist_head *bucket;
104 struct hlist_node *scan;
107 bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
108 fld_cache->fld_hash_mask);
110 spin_lock(&fld_cache->fld_lock);
111 hlist_for_each_entry(flde, scan, bucket, fld_list) {
112 if (flde->fld_seq == seq) {
113 hlist_del_init(&flde->fld_list);
120 spin_unlock(&fld_cache->fld_lock);
124 static struct fld_cache_entry *
125 fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
127 struct fld_cache_entry *flde;
128 struct hlist_head *bucket;
129 struct hlist_node *scan;
132 bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
133 fld_cache->fld_hash_mask);
135 spin_lock(&fld_cache->fld_lock);
136 hlist_for_each_entry(flde, scan, bucket, fld_list) {
137 if (flde->fld_seq == seq) {
138 spin_unlock(&fld_cache->fld_lock);
142 spin_unlock(&fld_cache->fld_lock);
148 fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
150 if (fld->fld_count == 0)
153 return do_div(seq, fld->fld_count);
157 fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
159 /* XXX: here should DHT hash */
160 return fld_rrb_hash(fld, seq);
163 struct lu_fld_hash fld_hash[3] = {
166 .fh_func = fld_dht_hash
169 .fh_name = "Round Robin",
170 .fh_func = fld_rrb_hash
177 static struct obd_export *
178 fld_client_get_export(struct lu_client_fld *fld, __u64 seq)
180 struct obd_export *fld_exp;
184 LASSERT(fld->fld_hash != NULL);
186 spin_lock(&fld->fld_lock);
187 hash = fld->fld_hash->fh_func(fld, seq);
188 spin_unlock(&fld->fld_lock);
190 spin_lock(&fld->fld_lock);
191 list_for_each_entry(fld_exp,
192 &fld->fld_exports, exp_fld_chain) {
194 spin_unlock(&fld->fld_lock);
199 spin_unlock(&fld->fld_lock);
203 /* add export to FLD. This is usually done by CMM and LMV as they are main users
206 fld_client_add_export(struct lu_client_fld *fld,
207 struct obd_export *exp)
209 struct obd_export *fld_exp;
212 LASSERT(exp != NULL);
214 CDEBUG(D_INFO|D_WARNING, "FLD(cli): adding export %s\n",
215 exp->exp_client_uuid.uuid);
217 spin_lock(&fld->fld_lock);
218 list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
219 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
220 &exp->exp_client_uuid))
222 spin_unlock(&fld->fld_lock);
227 fld_exp = class_export_get(exp);
228 list_add_tail(&fld_exp->exp_fld_chain,
232 spin_unlock(&fld->fld_lock);
236 EXPORT_SYMBOL(fld_client_add_export);
238 /* remove export from FLD */
240 fld_client_del_export(struct lu_client_fld *fld,
241 struct obd_export *exp)
243 struct obd_export *fld_exp;
244 struct obd_export *tmp;
247 spin_lock(&fld->fld_lock);
248 list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
249 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
250 &exp->exp_client_uuid))
253 list_del(&fld_exp->exp_fld_chain);
254 class_export_get(fld_exp);
256 spin_unlock(&fld->fld_lock);
260 spin_unlock(&fld->fld_lock);
263 EXPORT_SYMBOL(fld_client_del_export);
267 fld_client_proc_init(struct lu_client_fld *fld)
272 fld->fld_proc_dir = lprocfs_register(fld->fld_name,
276 if (IS_ERR(fld->fld_proc_dir)) {
277 CERROR("LProcFS failed in fld-init\n");
278 rc = PTR_ERR(fld->fld_proc_dir);
282 rc = lprocfs_add_vars(fld->fld_proc_dir,
283 fld_client_proc_list, fld);
285 CERROR("can't init FLD "
286 "proc, rc %d\n", rc);
293 fld->fld_proc_dir = NULL;
298 fld_client_proc_fini(struct lu_client_fld *fld)
301 if (fld->fld_proc_dir) {
302 lprocfs_remove(fld->fld_proc_dir);
303 fld->fld_proc_dir = NULL;
310 fld_client_init(struct lu_client_fld *fld,
311 const char *uuid, int hash)
316 LASSERT(fld != NULL);
318 if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
319 CERROR("wrong hash function 0x%x\n", hash);
323 INIT_LIST_HEAD(&fld->fld_exports);
324 spin_lock_init(&fld->fld_lock);
325 fld->fld_hash = &fld_hash[hash];
328 snprintf(fld->fld_name, sizeof(fld->fld_name),
329 "%s-%s", LUSTRE_FLD_NAME, uuid);
332 rc = fld_client_proc_init(fld);
339 fld_client_fini(fld);
341 CDEBUG(D_INFO|D_WARNING,
342 "Client FLD, using \"%s\" hash\n",
343 fld->fld_hash->fh_name);
346 EXPORT_SYMBOL(fld_client_init);
349 fld_client_fini(struct lu_client_fld *fld)
351 struct obd_export *fld_exp;
352 struct obd_export *tmp;
356 fld_client_proc_fini(fld);
359 spin_lock(&fld->fld_lock);
360 list_for_each_entry_safe(fld_exp, tmp,
361 &fld->fld_exports, exp_fld_chain) {
363 list_del(&fld_exp->exp_fld_chain);
364 class_export_get(fld_exp);
366 spin_unlock(&fld->fld_lock);
367 CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
370 EXPORT_SYMBOL(fld_client_fini);
373 fld_client_rpc(struct obd_export *exp,
374 struct md_fld *mf, __u32 fld_op)
376 int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
377 int mf_size = sizeof(struct md_fld);
378 struct ptlrpc_request *req;
383 LASSERT(exp != NULL);
385 req = ptlrpc_prep_req(class_exp2cliimp(exp),
386 LUSTRE_MDS_VERSION, FLD_QUERY,
391 op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
394 pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
395 memcpy(pmf, mf, sizeof(*mf));
397 req->rq_replen = lustre_msg_size(1, &mf_size);
398 req->rq_request_portal = MDS_FLD_PORTAL;
400 rc = ptlrpc_queue_wait(req);
404 pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
408 ptlrpc_req_finished(req);
413 fld_client_create(struct lu_client_fld *fld,
414 __u64 seq, mdsno_t mds)
416 struct obd_export *fld_exp;
417 struct md_fld md_fld;
421 fld_exp = fld_client_get_export(fld, seq);
427 rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
431 rc = fld_cache_insert(fld_cache, seq, mds);
436 EXPORT_SYMBOL(fld_client_create);
439 fld_client_delete(struct lu_client_fld *fld,
442 struct obd_export *fld_exp;
443 struct md_fld md_fld;
447 fld_cache_delete(fld_cache, seq);
450 fld_exp = fld_client_get_export(fld, seq);
457 rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
460 EXPORT_SYMBOL(fld_client_delete);
463 fld_client_get(struct lu_client_fld *fld,
464 __u64 seq, mdsno_t *mds)
466 struct obd_export *fld_exp;
467 struct md_fld md_fld;
471 fld_exp = fld_client_get_export(fld, seq);
476 rc = fld_client_rpc(fld_exp,
477 &md_fld, FLD_LOOKUP);
479 *mds = md_fld.mf_mds;
484 /* lookup fid in the namespace of pfid according to the name */
486 fld_client_lookup(struct lu_client_fld *fld,
487 __u64 seq, mdsno_t *mds)
490 struct fld_cache_entry *flde;
496 /* lookup it in the cache */
497 flde = fld_cache_lookup(fld_cache, seq);
499 *mds = flde->fld_mds;
504 /* can not find it in the cache */
505 rc = fld_client_get(fld, seq, mds);
511 rc = fld_cache_insert(fld_cache, seq, *mds);
516 EXPORT_SYMBOL(fld_client_lookup);