Whamcloud - gitweb
b1b66b97e614148ac429d097eb196f4b439a9dd9
[fs/lustre-release.git] / lustre / fld / fld_request.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_request.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_FLD
32
33 #ifdef __KERNEL__
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
41 #endif
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
54
55 static int fld_rrb_hash(struct lu_client_fld *fld,
56                         seqno_t seq)
57 {
58         LASSERT(fld->lcf_count > 0);
59         return do_div(seq, fld->lcf_count);
60 }
61
62 static struct lu_fld_target *
63 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
64 {
65         struct lu_fld_target *target;
66         int hash;
67         ENTRY;
68
69         hash = fld_rrb_hash(fld, seq);
70
71         list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
72                 if (target->ft_idx == hash)
73                         RETURN(target);
74         }
75
76         /* if target is not found, there is logical error anyway, so here is
77          * LBUG() to catch this situation. */
78         LBUG();
79         RETURN(NULL);
80 }
81
82 static int fld_dht_hash(struct lu_client_fld *fld,
83                         seqno_t seq)
84 {
85         /* XXX: here should be DHT hash */
86         return fld_rrb_hash(fld, seq);
87 }
88
89 static struct lu_fld_target *
90 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
91 {
92         /* XXX: here should be DHT scan code */
93         return fld_rrb_scan(fld, seq);
94 }
95
96 struct lu_fld_hash fld_hash[3] = {
97         {
98                 .fh_name = "DHT",
99                 .fh_hash_func = fld_dht_hash,
100                 .fh_scan_func = fld_dht_scan
101         },
102         {
103                 .fh_name = "RRB",
104                 .fh_hash_func = fld_rrb_hash,
105                 .fh_scan_func = fld_rrb_scan
106         },
107         {
108                 0,
109         }
110 };
111
112 static struct lu_fld_target *
113 fld_client_get_target(struct lu_client_fld *fld,
114                       seqno_t seq)
115 {
116         struct lu_fld_target *target;
117         ENTRY;
118
119         LASSERT(fld->lcf_hash != NULL);
120
121         spin_lock(&fld->lcf_lock);
122         target = fld->lcf_hash->fh_scan_func(fld, seq);
123         spin_unlock(&fld->lcf_lock);
124
125         RETURN(target);
126 }
127
128 /*
129  * Add export to FLD. This is usually done by CMM and LMV as they are main users
130  * of FLD module.
131  */
132 int fld_client_add_target(struct lu_client_fld *fld,
133                           struct lu_fld_target *tar)
134 {
135         const char *tar_name = fld_target_name(tar);
136         struct lu_fld_target *target, *tmp;
137         ENTRY;
138
139         LASSERT(tar != NULL);
140         LASSERT(tar_name != NULL);
141         LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
142
143         CDEBUG(D_INFO|D_WARNING, "%s: adding target %s\n",
144                fld->lcf_name, tar_name);
145
146         OBD_ALLOC_PTR(target);
147         if (target == NULL)
148                 RETURN(-ENOMEM);
149
150         spin_lock(&fld->lcf_lock);
151         list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
152                 const char *tmp_name = fld_target_name(tmp);
153                 
154                 if (strlen(tar_name) == strlen(tmp_name) &&
155                     strcmp(tmp_name, tar_name) == 0)
156                 {
157                         spin_unlock(&fld->lcf_lock);
158                         OBD_FREE_PTR(target);
159                         RETURN(-EEXIST);
160                 }
161         }
162
163         target->ft_exp = tar->ft_exp;
164         target->ft_srv = tar->ft_srv;
165         target->ft_idx = tar->ft_idx;
166
167         list_add_tail(&target->ft_chain,
168                       &fld->lcf_targets);
169         
170         fld->lcf_count++;
171         spin_unlock(&fld->lcf_lock);
172
173         RETURN(0);
174 }
175 EXPORT_SYMBOL(fld_client_add_target);
176
177 /* remove export from FLD */
178 int fld_client_del_target(struct lu_client_fld *fld,
179                           __u64 idx)
180 {
181         struct lu_fld_target *target, *tmp;
182         ENTRY;
183
184         spin_lock(&fld->lcf_lock);
185         list_for_each_entry_safe(target, tmp,
186                                  &fld->lcf_targets, ft_chain) {
187                 if (target->ft_idx == idx) {
188                         fld->lcf_count--;
189                         list_del(&target->ft_chain);
190                         spin_unlock(&fld->lcf_lock);
191                         class_export_put(target->ft_exp);
192                         OBD_FREE_PTR(target);
193                         RETURN(0);
194                 }
195         }
196         spin_unlock(&fld->lcf_lock);
197         RETURN(-ENOENT);
198 }
199 EXPORT_SYMBOL(fld_client_del_target);
200
201 static void fld_client_proc_fini(struct lu_client_fld *fld);
202
203 #ifdef LPROCFS
204 static int fld_client_proc_init(struct lu_client_fld *fld)
205 {
206         int rc;
207         ENTRY;
208
209         fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
210                                              proc_lustre_root,
211                                              NULL, NULL);
212
213         if (IS_ERR(fld->lcf_proc_dir)) {
214                 CERROR("LProcFS failed in fld-init\n");
215                 rc = PTR_ERR(fld->lcf_proc_dir);
216                 RETURN(rc);
217         }
218
219         rc = lprocfs_add_vars(fld->lcf_proc_dir,
220                               fld_client_proc_list, fld);
221         if (rc) {
222                 CERROR("can't init FLD "
223                        "proc, rc %d\n", rc);
224                 GOTO(out_cleanup, rc);
225         }
226
227         RETURN(0);
228
229 out_cleanup:
230         fld_client_proc_fini(fld);
231         return rc;
232 }
233
234 static void fld_client_proc_fini(struct lu_client_fld *fld)
235 {
236         ENTRY;
237         if (fld->lcf_proc_dir) {
238                 if (!IS_ERR(fld->lcf_proc_dir))
239                         lprocfs_remove(fld->lcf_proc_dir);
240                 fld->lcf_proc_dir = NULL;
241         }
242         EXIT;
243 }
244 #else
245 static int fld_client_proc_init(struct lu_client_fld *fld)
246 {
247         return 0;
248 }
249
250 static void fld_client_proc_fini(struct lu_client_fld *fld)
251 {
252         return;
253 }
254 #endif
255
256 static inline int hash_is_sane(int hash)
257 {
258         return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
259 }
260
261 /* 1M of FLD cache will not hurt client a lot */
262 #define FLD_CACHE_SIZE 1024000
263
264 /* cache threshold is 10 percent of size */
265 #define FLD_CACHE_THRESHOLD 10
266
267 int fld_client_init(struct lu_client_fld *fld,
268                     const char *prefix, int hash,
269                     const struct lu_context *ctx)
270 {
271 #ifdef __KERNEL__
272         int cache_size, cache_threshold;
273 #endif
274         int rc;
275         ENTRY;
276
277         LASSERT(fld != NULL);
278
279         if (!hash_is_sane(hash)) {
280                 CERROR("Wrong hash function %#x\n", hash);
281                 RETURN(-EINVAL);
282         }
283
284         fld->lcf_count = 0;
285         fld->lcf_ctx = ctx;
286         spin_lock_init(&fld->lcf_lock);
287         fld->lcf_hash = &fld_hash[hash];
288         INIT_LIST_HEAD(&fld->lcf_targets);
289
290         snprintf(fld->lcf_name, sizeof(fld->lcf_name),
291                  "%s-cli-%s", LUSTRE_FLD_NAME, prefix);
292
293 #ifdef __KERNEL__
294         cache_size = FLD_CACHE_SIZE /
295                 sizeof(struct fld_cache_entry);
296
297         cache_threshold = cache_size *
298                 FLD_CACHE_THRESHOLD / 100;
299
300         fld->lcf_cache = fld_cache_init(FLD_HTABLE_SIZE,
301                                         cache_size,
302                                         cache_threshold);
303         if (IS_ERR(fld->lcf_cache)) {
304                 rc = PTR_ERR(fld->lcf_cache);
305                 fld->lcf_cache = NULL;
306                 GOTO(out, rc);
307         }
308 #endif
309
310         rc = fld_client_proc_init(fld);
311         if (rc)
312                 GOTO(out, rc);
313         EXIT;
314 out:
315         if (rc)
316                 fld_client_fini(fld);
317         else
318                 CDEBUG(D_INFO|D_WARNING,
319                        "Client FLD, using \"%s\" hash\n",
320                        fld->lcf_hash->fh_name);
321         return rc;
322 }
323 EXPORT_SYMBOL(fld_client_init);
324
325 void fld_client_fini(struct lu_client_fld *fld)
326 {
327         struct lu_fld_target *target, *tmp;
328         ENTRY;
329
330         fld_client_proc_fini(fld);
331
332         spin_lock(&fld->lcf_lock);
333         list_for_each_entry_safe(target, tmp,
334                                  &fld->lcf_targets, ft_chain) {
335                 fld->lcf_count--;
336                 list_del(&target->ft_chain);
337                 if (target->ft_exp != NULL)
338                         class_export_put(target->ft_exp);
339                 OBD_FREE_PTR(target);
340         }
341         spin_unlock(&fld->lcf_lock);
342
343 #ifdef __KERNEL__
344         if (fld->lcf_cache != NULL) {
345                 if (!IS_ERR(fld->lcf_cache))
346                         fld_cache_fini(fld->lcf_cache);
347                 fld->lcf_cache = NULL;
348         }
349 #endif
350
351         CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
352         EXIT;
353 }
354 EXPORT_SYMBOL(fld_client_fini);
355
356 static int fld_client_rpc(struct obd_export *exp,
357                           struct md_fld *mf, __u32 fld_op)
358 {
359         int size[3] = { sizeof(struct ptlrpc_body),
360                         sizeof(__u32),
361                         sizeof(struct md_fld) };
362         struct ptlrpc_request *req;
363         struct req_capsule pill;
364         struct md_fld *pmf;
365         __u32 *op;
366         int rc;
367         ENTRY;
368
369         LASSERT(exp != NULL);
370
371         req = ptlrpc_prep_req(class_exp2cliimp(exp),
372                               LUSTRE_MDS_VERSION, FLD_QUERY,
373                               3, size, NULL);
374         if (req == NULL)
375                 RETURN(-ENOMEM);
376
377         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
378
379         req_capsule_set(&pill, &RQF_FLD_QUERY);
380
381         op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
382         *op = fld_op;
383
384         pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
385         *pmf = *mf;
386
387         size[1] = sizeof(struct md_fld);
388         ptlrpc_req_set_repsize(req, 2, size);
389         req->rq_request_portal = FLD_REQUEST_PORTAL;
390
391         rc = ptlrpc_queue_wait(req);
392         if (rc)
393                 GOTO(out_req, rc);
394
395         pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
396         if (pmf == NULL)
397                 GOTO(out_req, rc = -EFAULT);
398         *mf = *pmf;
399         EXIT;
400 out_req:
401         req_capsule_fini(&pill);
402         ptlrpc_req_finished(req);
403         return rc;
404 }
405
406 int fld_client_create(struct lu_client_fld *fld,
407                       seqno_t seq, mdsno_t mds)
408 {
409         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
410         struct lu_fld_target *target;
411         int rc;
412         ENTRY;
413
414         target = fld_client_get_target(fld, seq);
415         LASSERT(target != NULL);
416
417 #ifdef __KERNEL__
418         if (target->ft_srv != NULL) {
419                 LASSERT(fld->lcf_ctx != NULL);
420                 rc = fld_server_create(target->ft_srv,
421                                        fld->lcf_ctx,
422                                        seq, mds);
423         } else {
424 #endif
425                 rc = fld_client_rpc(target->ft_exp,
426                                     &md_fld, FLD_CREATE);
427 #ifdef __KERNEL__
428         }
429 #endif
430
431         if (rc == 0) {
432                 /*
433                  * Do not return result of calling fld_cache_insert()
434                  * here. First of all because it may return -EEXISTS. Another
435                  * reason is that, we do not want to stop proceeding because of
436                  * cache errors. --umka
437                  */
438                 fld_cache_insert(fld->lcf_cache, seq, mds);
439         }
440         RETURN(rc);
441 }
442 EXPORT_SYMBOL(fld_client_create);
443
444 int fld_client_delete(struct lu_client_fld *fld,
445                       seqno_t seq)
446 {
447         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
448         struct lu_fld_target *target;
449         int rc;
450         ENTRY;
451
452         fld_cache_delete(fld->lcf_cache, seq);
453
454         target = fld_client_get_target(fld, seq);
455         LASSERT(target != NULL);
456
457 #ifdef __KERNEL__
458         if (target->ft_srv != NULL) {
459                 LASSERT(fld->lcf_ctx != NULL);
460                 rc = fld_server_delete(target->ft_srv,
461                                        fld->lcf_ctx,
462                                        seq);
463         } else {
464 #endif
465                 rc = fld_client_rpc(target->ft_exp,
466                                     &md_fld, FLD_DELETE);
467 #ifdef __KERNEL__
468         }
469 #endif
470
471         RETURN(rc);
472 }
473 EXPORT_SYMBOL(fld_client_delete);
474
475 int fld_client_lookup(struct lu_client_fld *fld,
476                       seqno_t seq, mdsno_t *mds)
477 {
478         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
479         struct lu_fld_target *target;
480         int rc;
481         ENTRY;
482
483         /* lookup it in the cache */
484         rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
485         if (rc == 0)
486                 RETURN(0);
487
488         /* can not find it in the cache */
489         target = fld_client_get_target(fld, seq);
490         LASSERT(target != NULL);
491
492 #ifdef __KERNEL__
493         if (target->ft_srv != NULL) {
494                 LASSERT(fld->lcf_ctx != NULL);
495                 rc = fld_server_lookup(target->ft_srv,
496                                        fld->lcf_ctx,
497                                        seq, mds);
498         } else {
499 #endif
500                 rc = fld_client_rpc(target->ft_exp,
501                                     &md_fld, FLD_LOOKUP);
502 #ifdef __KERNEL__
503         }
504 #endif
505         if (rc == 0) {
506                 *mds = md_fld.mf_mds;
507
508                 /*
509                  * Do not return error here as well. See previous comment in
510                  * same situation in function fld_client_create(). --umka
511                  */
512                 fld_cache_insert(fld->lcf_cache, seq, *mds);
513         }
514         RETURN(rc);
515 }
516 EXPORT_SYMBOL(fld_client_lookup);