Whamcloud - gitweb
- merge with 1_5,some fixes.
[fs/lustre-release.git] / lustre / fld / fld_request.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_request.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_FLD
32
33 #ifdef __KERNEL__
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
41 #endif
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
54
55 static int fld_rrb_hash(struct lu_client_fld *fld,
56                         seqno_t seq)
57 {
58         LASSERT(fld->fld_count > 0);
59         return do_div(seq, fld->fld_count);
60 }
61
62 static struct fld_target *
63 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
64 {
65         struct fld_target *target;
66         int hash;
67         ENTRY;
68
69         hash = fld_rrb_hash(fld, seq);
70
71         list_for_each_entry(target, &fld->fld_targets, fldt_chain) {
72                 if (target->fldt_idx == hash)
73                         RETURN(target);
74         }
75
76         /* if target is not found, there is logical error anyway, so here is
77          * LBUG() to catch this situation. */
78         LBUG();
79         RETURN(NULL);
80 }
81
82 static int fld_dht_hash(struct lu_client_fld *fld,
83                         seqno_t seq)
84 {
85         /* XXX: here should be DHT hash */
86         return fld_rrb_hash(fld, seq);
87 }
88
89 static struct fld_target *
90 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
91 {
92         /* XXX: here should be DHT scan code */
93         return fld_dht_scan(fld, seq);
94 }
95
96 struct lu_fld_hash fld_hash[3] = {
97         {
98                 .fh_name = "DHT",
99                 .fh_hash_func = fld_dht_hash,
100                 .fh_scan_func = fld_dht_scan
101         },
102         {
103                 .fh_name = "RRB",
104                 .fh_hash_func = fld_rrb_hash,
105                 .fh_scan_func = fld_rrb_scan
106         },
107         {
108                 0,
109         }
110 };
111
112 static struct fld_target *
113 fld_client_get_target(struct lu_client_fld *fld,
114                       seqno_t seq)
115 {
116         struct fld_target *target;
117         ENTRY;
118
119         LASSERT(fld->fld_hash != NULL);
120
121         spin_lock(&fld->fld_lock);
122         target = fld->fld_hash->fh_scan_func(fld, seq);
123         spin_unlock(&fld->fld_lock);
124
125         RETURN(target);
126 }
127
128 /*
129  * Add export to FLD. This is usually done by CMM and LMV as they are main users
130  * of FLD module.
131  */
132 int fld_client_add_target(struct lu_client_fld *fld,
133                           struct obd_export *exp)
134 {
135         struct client_obd *cli = &exp->exp_obd->u.cli;
136         struct fld_target *target, *tmp;
137         ENTRY;
138
139         LASSERT(exp != NULL);
140
141         CDEBUG(D_INFO|D_WARNING, "%s: adding export %s\n",
142                fld->fld_name, cli->cl_target_uuid.uuid);
143
144         OBD_ALLOC_PTR(target);
145         if (target == NULL)
146                 RETURN(-ENOMEM);
147
148         spin_lock(&fld->fld_lock);
149         list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
150                 if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
151                                     &exp->exp_client_uuid))
152                 {
153                         spin_unlock(&fld->fld_lock);
154                         OBD_FREE_PTR(target);
155                         RETURN(-EEXIST);
156                 }
157         }
158
159         target->fldt_exp = class_export_get(exp);
160         target->fldt_idx = fld->fld_count;
161
162         list_add_tail(&target->fldt_chain,
163                       &fld->fld_targets);
164         fld->fld_count++;
165         spin_unlock(&fld->fld_lock);
166
167         RETURN(0);
168 }
169 EXPORT_SYMBOL(fld_client_add_target);
170
171 /* remove export from FLD */
172 int fld_client_del_target(struct lu_client_fld *fld,
173                           struct obd_export *exp)
174 {
175         struct fld_target *target, *tmp;
176         ENTRY;
177
178         spin_lock(&fld->fld_lock);
179         list_for_each_entry_safe(target, tmp,
180                                  &fld->fld_targets, fldt_chain) {
181                 if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
182                                     &exp->exp_client_uuid))
183                 {
184                         fld->fld_count--;
185                         list_del(&target->fldt_chain);
186                         spin_unlock(&fld->fld_lock);
187                         class_export_put(target->fldt_exp);
188                         OBD_FREE_PTR(target);
189                         RETURN(0);
190                 }
191         }
192         spin_unlock(&fld->fld_lock);
193         RETURN(-ENOENT);
194 }
195 EXPORT_SYMBOL(fld_client_del_target);
196
197 static void fld_client_proc_fini(struct lu_client_fld *fld);
198
199 #ifdef LPROCFS
200 static int fld_client_proc_init(struct lu_client_fld *fld)
201 {
202         int rc;
203         ENTRY;
204
205         fld->fld_proc_dir = lprocfs_register(fld->fld_name,
206                                              proc_lustre_root,
207                                              NULL, NULL);
208
209         if (IS_ERR(fld->fld_proc_dir)) {
210                 CERROR("LProcFS failed in fld-init\n");
211                 rc = PTR_ERR(fld->fld_proc_dir);
212                 RETURN(rc);
213         }
214
215         rc = lprocfs_add_vars(fld->fld_proc_dir,
216                               fld_client_proc_list, fld);
217         if (rc) {
218                 CERROR("can't init FLD "
219                        "proc, rc %d\n", rc);
220                 GOTO(out_cleanup, rc);
221         }
222
223         RETURN(0);
224
225 out_cleanup:
226         fld_client_proc_fini(fld);
227         return rc;
228 }
229
230 static void fld_client_proc_fini(struct lu_client_fld *fld)
231 {
232         ENTRY;
233         if (fld->fld_proc_dir) {
234                 if (!IS_ERR(fld->fld_proc_dir))
235                         lprocfs_remove(fld->fld_proc_dir);
236                 fld->fld_proc_dir = NULL;
237         }
238         EXIT;
239 }
240 #else
241 static int fld_client_proc_init(struct lu_client_fld *fld)
242 {
243         return 0;
244 }
245
246 static void fld_client_proc_fini(struct lu_client_fld *fld)
247 {
248         return;
249 }
250 #endif
251
252 static inline int hash_is_sane(int hash)
253 {
254         return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
255 }
256
257 /* 1M of FLD cache will not hurt client a lot */
258 #define FLD_CACHE_SIZE 1024000
259
260 /* cache threshold is 10 percent of size */
261 #define FLD_CACHE_THRESHOLD 10
262
263 int fld_client_init(struct lu_client_fld *fld,
264                     const char *uuid, int hash)
265 {
266 #ifdef __KERNEL__
267         int cache_size, cache_threshold;
268 #endif
269         int rc;
270         ENTRY;
271
272         LASSERT(fld != NULL);
273
274         if (!hash_is_sane(hash)) {
275                 CERROR("wrong hash function %#x\n", hash);
276                 RETURN(-EINVAL);
277         }
278
279         INIT_LIST_HEAD(&fld->fld_targets);
280         spin_lock_init(&fld->fld_lock);
281         fld->fld_hash = &fld_hash[hash];
282         fld->fld_count = 0;
283
284         snprintf(fld->fld_name, sizeof(fld->fld_name),
285                  "%s-cli-%s", LUSTRE_FLD_NAME, uuid);
286
287 #ifdef __KERNEL__
288         cache_size = FLD_CACHE_SIZE /
289                 sizeof(struct fld_cache_entry);
290
291         cache_threshold = cache_size *
292                 FLD_CACHE_THRESHOLD / 100;
293
294         fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE,
295                                         cache_size,
296                                         cache_threshold);
297         if (IS_ERR(fld->fld_cache)) {
298                 rc = PTR_ERR(fld->fld_cache);
299                 fld->fld_cache = NULL;
300                 GOTO(out, rc);
301         }
302 #endif
303
304         rc = fld_client_proc_init(fld);
305         if (rc)
306                 GOTO(out, rc);
307         EXIT;
308 out:
309         if (rc)
310                 fld_client_fini(fld);
311         else
312                 CDEBUG(D_INFO|D_WARNING,
313                        "Client FLD, using \"%s\" hash\n",
314                        fld->fld_hash->fh_name);
315         return rc;
316 }
317 EXPORT_SYMBOL(fld_client_init);
318
319 void fld_client_fini(struct lu_client_fld *fld)
320 {
321         struct fld_target *target, *tmp;
322         ENTRY;
323
324         fld_client_proc_fini(fld);
325
326         spin_lock(&fld->fld_lock);
327         list_for_each_entry_safe(target, tmp,
328                                  &fld->fld_targets, fldt_chain) {
329                 fld->fld_count--;
330                 list_del(&target->fldt_chain);
331                 class_export_put(target->fldt_exp);
332                 OBD_FREE_PTR(target);
333         }
334         spin_unlock(&fld->fld_lock);
335
336 #ifdef __KERNEL__
337         if (fld->fld_cache != NULL) {
338                 if (!IS_ERR(fld->fld_cache))
339                         fld_cache_fini(fld->fld_cache);
340                 fld->fld_cache = NULL;
341         }
342 #endif
343
344         CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
345         EXIT;
346 }
347 EXPORT_SYMBOL(fld_client_fini);
348
349 static int fld_client_rpc(struct obd_export *exp,
350                           struct md_fld *mf, __u32 fld_op)
351 {
352         int reqsize[3] = { sizeof(struct ptlrpc_body),
353                            sizeof(__u32),
354                            sizeof(struct md_fld) };
355         int repsize[2] = { sizeof(struct ptlrpc_body),
356                            sizeof(struct md_fld) };
357         struct ptlrpc_request *req;
358         struct req_capsule pill;
359         struct md_fld *pmf;
360         __u32 *op;
361         int rc;
362         ENTRY;
363
364         LASSERT(exp != NULL);
365
366         req = ptlrpc_prep_req(class_exp2cliimp(exp),
367                               LUSTRE_MDS_VERSION, FLD_QUERY,
368                               3, reqsize, NULL);
369         if (req == NULL)
370                 RETURN(-ENOMEM);
371
372         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
373
374         req_capsule_set(&pill, &RQF_FLD_QUERY);
375
376         op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
377         *op = fld_op;
378
379         pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
380         *pmf = *mf;
381
382         ptlrpc_req_set_repsize(req, 2, repsize);
383         req->rq_request_portal = FLD_REQUEST_PORTAL;
384
385         rc = ptlrpc_queue_wait(req);
386         if (rc)
387                 GOTO(out_req, rc);
388
389         pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
390         if (pmf == NULL)
391                 GOTO(out_req, rc = -EFAULT);
392         *mf = *pmf;
393         EXIT;
394 out_req:
395         req_capsule_fini(&pill);
396         ptlrpc_req_finished(req);
397         return rc;
398 }
399
400 int fld_client_create(struct lu_client_fld *fld,
401                       seqno_t seq, mdsno_t mds)
402 {
403         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
404         struct fld_target *target;
405         int rc;
406         ENTRY;
407
408         target = fld_client_get_target(fld, seq);
409         LASSERT(target != NULL);
410
411         rc = fld_client_rpc(target->fldt_exp, &md_fld, FLD_CREATE);
412
413         if (rc == 0) {
414                 /*
415                  * Do not return result of calling fld_cache_insert()
416                  * here. First of all because it may return -EEXISTS. Another
417                  * reason is that, we do not want to stop proceeding because of
418                  * cache errors. --umka
419                  */
420                 fld_cache_insert(fld->fld_cache, seq, mds);
421         }
422         RETURN(rc);
423 }
424 EXPORT_SYMBOL(fld_client_create);
425
426 int fld_client_delete(struct lu_client_fld *fld,
427                       seqno_t seq)
428 {
429         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
430         struct fld_target *target;
431         int rc;
432         ENTRY;
433
434         fld_cache_delete(fld->fld_cache, seq);
435
436         target = fld_client_get_target(fld, seq);
437         LASSERT(target != NULL);
438
439         rc = fld_client_rpc(target->fldt_exp,
440                             &md_fld, FLD_DELETE);
441
442         RETURN(rc);
443 }
444 EXPORT_SYMBOL(fld_client_delete);
445
446 int fld_client_lookup(struct lu_client_fld *fld,
447                       seqno_t seq, mdsno_t *mds)
448 {
449         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
450         struct fld_target *target;
451         int rc;
452         ENTRY;
453
454         /* lookup it in the cache */
455         rc = fld_cache_lookup(fld->fld_cache, seq, mds);
456         if (rc == 0)
457                 RETURN(0);
458
459         /* can not find it in the cache */
460         target = fld_client_get_target(fld, seq);
461         LASSERT(target != NULL);
462
463         rc = fld_client_rpc(target->fldt_exp,
464                             &md_fld, FLD_LOOKUP);
465         if (rc == 0) {
466                 *mds = md_fld.mf_mds;
467
468                 /*
469                  * Do not return error here as well. See previous comment in
470                  * same situation in function fld_client_create(). --umka
471                  */
472                 fld_cache_insert(fld->fld_cache, seq, *mds);
473         }
474         RETURN(rc);
475 }
476 EXPORT_SYMBOL(fld_client_lookup);