1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fld/fld_request.c
5 * FLD (Fids Location Database)
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_FLD
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
55 static int fld_rrb_hash(struct lu_client_fld *fld,
58 if (fld->fld_count == 0)
61 return do_div(seq, fld->fld_count);
64 static int fld_dht_hash(struct lu_client_fld *fld,
67 /* XXX: here should be DHT hash */
68 return fld_rrb_hash(fld, seq);
71 struct lu_fld_hash fld_hash[3] = {
74 .fh_func = fld_dht_hash
77 .fh_name = "Round Robin",
78 .fh_func = fld_rrb_hash
85 /* this function makes decision if passed @target appropriate acoordingly to
86 * passed @hash. In case of usual round-robin hash, this is decided by comparing
87 * hash and target's index. In the case of DHT, algorithm is a bit more
89 static int fld_client_apt_target(struct fld_target *target,
92 /* XXX: DHT case should be worked out. */
93 return (target->fldt_idx == hash);
96 static struct fld_target *
97 fld_client_get_target(struct lu_client_fld *fld,
100 struct fld_target *target;
104 LASSERT(fld->fld_hash != NULL);
106 spin_lock(&fld->fld_lock);
107 hash = fld->fld_hash->fh_func(fld, seq);
109 list_for_each_entry(target,
110 &fld->fld_targets, fldt_chain) {
111 if (fld_client_apt_target(target, hash)) {
112 spin_unlock(&fld->fld_lock);
116 spin_unlock(&fld->fld_lock);
118 /* if target is not found, there is logical error anyway, so here is
119 * LBUG() to catch that situation. */
124 /* add export to FLD. This is usually done by CMM and LMV as they are main users
126 int fld_client_add_target(struct lu_client_fld *fld,
127 struct obd_export *exp)
129 struct client_obd *cli = &exp->exp_obd->u.cli;
130 struct fld_target *target, *tmp;
133 LASSERT(exp != NULL);
135 CDEBUG(D_INFO|D_WARNING, "%s: adding export %s\n",
136 fld->fld_name, cli->cl_target_uuid.uuid);
138 OBD_ALLOC_PTR(target);
142 spin_lock(&fld->fld_lock);
143 list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
144 if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
145 &exp->exp_client_uuid))
147 spin_unlock(&fld->fld_lock);
148 OBD_FREE_PTR(target);
153 target->fldt_exp = class_export_get(exp);
154 target->fldt_idx = fld->fld_count;
156 list_add_tail(&target->fldt_chain,
159 spin_unlock(&fld->fld_lock);
163 EXPORT_SYMBOL(fld_client_add_target);
165 /* remove export from FLD */
166 int fld_client_del_target(struct lu_client_fld *fld,
167 struct obd_export *exp)
169 struct fld_target *target, *tmp;
172 spin_lock(&fld->fld_lock);
173 list_for_each_entry_safe(target, tmp,
174 &fld->fld_targets, fldt_chain) {
175 if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
176 &exp->exp_client_uuid))
179 list_del(&target->fldt_chain);
180 spin_unlock(&fld->fld_lock);
181 class_export_put(target->fldt_exp);
182 OBD_FREE_PTR(target);
186 spin_unlock(&fld->fld_lock);
189 EXPORT_SYMBOL(fld_client_del_target);
192 static int fld_client_proc_init(struct lu_client_fld *fld)
197 fld->fld_proc_dir = lprocfs_register(fld->fld_name,
201 if (IS_ERR(fld->fld_proc_dir)) {
202 CERROR("LProcFS failed in fld-init\n");
203 rc = PTR_ERR(fld->fld_proc_dir);
207 rc = lprocfs_add_vars(fld->fld_proc_dir,
208 fld_client_proc_list, fld);
210 CERROR("can't init FLD "
211 "proc, rc %d\n", rc);
218 lprocfs_remove(fld->fld_proc_dir);
220 fld->fld_proc_dir = NULL;
224 static void fld_client_proc_fini(struct lu_client_fld *fld)
227 if (fld->fld_proc_dir) {
228 lprocfs_remove(fld->fld_proc_dir);
229 fld->fld_proc_dir = NULL;
234 static int fld_client_proc_init(struct lu_client_fld *fld)
239 static void fld_client_proc_fini(struct lu_client_fld *fld)
245 static inline int hash_is_sane(int hash)
247 return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
250 int fld_client_init(struct lu_client_fld *fld,
251 const char *uuid, int hash)
256 LASSERT(fld != NULL);
258 if (!hash_is_sane(hash)) {
259 CERROR("wrong hash function 0x%x\n", hash);
263 INIT_LIST_HEAD(&fld->fld_targets);
264 spin_lock_init(&fld->fld_lock);
265 fld->fld_hash = &fld_hash[hash];
268 snprintf(fld->fld_name, sizeof(fld->fld_name),
269 "%s-cli-%s", LUSTRE_FLD_NAME, uuid);
272 fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
273 if (IS_ERR(fld->fld_cache)) {
274 rc = PTR_ERR(fld->fld_cache);
275 fld->fld_cache = NULL;
280 rc = fld_client_proc_init(fld);
286 fld_client_fini(fld);
288 CDEBUG(D_INFO|D_WARNING,
289 "Client FLD, using \"%s\" hash\n",
290 fld->fld_hash->fh_name);
293 EXPORT_SYMBOL(fld_client_init);
295 void fld_client_fini(struct lu_client_fld *fld)
297 struct fld_target *target, *tmp;
300 fld_client_proc_fini(fld);
302 spin_lock(&fld->fld_lock);
303 list_for_each_entry_safe(target, tmp,
304 &fld->fld_targets, fldt_chain) {
306 list_del(&target->fldt_chain);
307 class_export_put(target->fldt_exp);
308 OBD_FREE_PTR(target);
310 spin_unlock(&fld->fld_lock);
313 if (fld->fld_cache != NULL) {
314 fld_cache_fini(fld->fld_cache);
315 fld->fld_cache = NULL;
319 CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
322 EXPORT_SYMBOL(fld_client_fini);
324 static int fld_client_rpc(struct obd_export *exp,
325 struct md_fld *mf, __u32 fld_op)
327 int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
328 int mf_size = sizeof(struct md_fld);
329 struct ptlrpc_request *req;
330 struct req_capsule pill;
335 LASSERT(exp != NULL);
337 req = ptlrpc_prep_req(class_exp2cliimp(exp),
338 LUSTRE_MDS_VERSION, FLD_QUERY,
343 req_capsule_init(&pill, req, RCL_CLIENT, NULL);
345 req_capsule_set(&pill, &RQF_FLD_QUERY);
347 op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
350 pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
353 req->rq_replen = lustre_msg_size(1, &mf_size);
354 req->rq_request_portal = FLD_REQUEST_PORTAL;
356 rc = ptlrpc_queue_wait(req);
360 pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
362 GOTO(out_req, rc = -EFAULT);
366 req_capsule_fini(&pill);
367 ptlrpc_req_finished(req);
371 static int __fld_client_create(struct lu_client_fld *fld,
372 seqno_t seq, mdsno_t mds,
373 struct md_fld *md_fld)
375 struct fld_target *target;
379 target = fld_client_get_target(fld, seq);
383 rc = fld_client_rpc(target->fldt_exp, md_fld, FLD_CREATE);
386 /* do not return result of calling fld_cache_insert()
387 * here. First of all because it may return -EEXISTS. Another
388 * reason is that, we do not want to stop proceeding because of
389 * cache errors. --umka */
390 fld_cache_insert(fld->fld_cache, seq, mds);
396 int fld_client_create(struct lu_client_fld *fld,
397 seqno_t seq, mdsno_t mds)
399 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
403 rc = __fld_client_create(fld, seq, mds, &md_fld);
406 EXPORT_SYMBOL(fld_client_create);
408 static int __fld_client_delete(struct lu_client_fld *fld,
409 seqno_t seq, struct md_fld *md_fld)
411 struct fld_target *target;
414 fld_cache_delete(fld->fld_cache, seq);
416 target = fld_client_get_target(fld, seq);
420 rc = fld_client_rpc(target->fldt_exp,
425 int fld_client_delete(struct lu_client_fld *fld,
428 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
431 rc = __fld_client_delete(fld, seq, &md_fld);
434 EXPORT_SYMBOL(fld_client_delete);
436 static int __fld_client_lookup(struct lu_client_fld *fld,
437 seqno_t seq, mdsno_t *mds,
438 struct md_fld *md_fld)
440 struct fld_target *target;
444 /* lookup it in the cache */
445 rc = fld_cache_lookup(fld->fld_cache, seq, mds);
449 /* can not find it in the cache */
450 target = fld_client_get_target(fld, seq);
454 rc = fld_client_rpc(target->fldt_exp,
457 *mds = md_fld->mf_mds;
459 /* do not return error here as well. See previous comment in same
460 * situation in function fld_client_create(). --umka */
461 fld_cache_insert(fld->fld_cache, seq, *mds);
466 int fld_client_lookup(struct lu_client_fld *fld,
467 seqno_t seq, mdsno_t *mds)
469 struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
473 rc = __fld_client_lookup(fld, seq, mds, &md_fld);
476 EXPORT_SYMBOL(fld_client_lookup);