Whamcloud - gitweb
29a281324e17069e26fdee5354bb4cef7906d46c
[fs/lustre-release.git] / lustre / fld / fld_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/fld/fld_request.c
37  *
38  * FLD (Fids Location Database)
39  *
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_FLD
47
48 #ifdef __KERNEL__
49 # include <libcfs/libcfs.h>
50 # include <linux/module.h>
51 # include <linux/jbd.h>
52 # include <asm/div64.h>
53 #else /* __KERNEL__ */
54 # include <liblustre.h>
55 # include <libcfs/list.h>
56 #endif
57
58 #include <obd.h>
59 #include <obd_class.h>
60 #include <lustre_ver.h>
61 #include <obd_support.h>
62 #include <lprocfs_status.h>
63
64 #include <dt_object.h>
65 #include <md_object.h>
66 #include <lustre_req_layout.h>
67 #include <lustre_fld.h>
68 #include <lustre_mdc.h>
69 #include "fld_internal.h"
70
71 /* TODO: these 3 functions are copies of flow-control code from mdc_lib.c 
72  * It should be common thing. The same about mdc RPC lock */
73 static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
74 {
75         int rc;
76         ENTRY;
77         spin_lock(&cli->cl_loi_list_lock);
78         rc = list_empty(&mcw->mcw_entry);
79         spin_unlock(&cli->cl_loi_list_lock);
80         RETURN(rc);
81 };
82
83 static void fld_enter_request(struct client_obd *cli)
84 {
85         struct mdc_cache_waiter mcw;
86         struct l_wait_info lwi = { 0 };
87
88         spin_lock(&cli->cl_loi_list_lock);
89         if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
90                 list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
91                 cfs_waitq_init(&mcw.mcw_waitq);
92                 spin_unlock(&cli->cl_loi_list_lock);
93                 l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi);
94         } else {
95                 cli->cl_r_in_flight++;
96                 spin_unlock(&cli->cl_loi_list_lock);
97         }
98 }
99
100 static void fld_exit_request(struct client_obd *cli)
101 {
102         struct list_head *l, *tmp;
103         struct mdc_cache_waiter *mcw;
104
105         spin_lock(&cli->cl_loi_list_lock);
106         cli->cl_r_in_flight--;
107         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
108                 
109                 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
110                         /* No free request slots anymore */
111                         break;
112                 }
113
114                 mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry);
115                 list_del_init(&mcw->mcw_entry);
116                 cli->cl_r_in_flight++;
117                 cfs_waitq_signal(&mcw->mcw_waitq);
118         }
119         spin_unlock(&cli->cl_loi_list_lock);
120 }
121
122 static int fld_rrb_hash(struct lu_client_fld *fld,
123                         seqno_t seq)
124 {
125         LASSERT(fld->lcf_count > 0);
126         return do_div(seq, fld->lcf_count);
127 }
128
129 static struct lu_fld_target *
130 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
131 {
132         struct lu_fld_target *target;
133         int hash;
134         ENTRY;
135
136         hash = fld_rrb_hash(fld, seq);
137
138         list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
139                 if (target->ft_idx == hash)
140                         RETURN(target);
141         }
142
143         CERROR("%s: Can't find target by hash %d (seq "LPX64"). "
144                "Targets (%d):\n", fld->lcf_name, hash, seq,
145                fld->lcf_count);
146
147         list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
148                 const char *srv_name = target->ft_srv != NULL  ?
149                         target->ft_srv->lsf_name : "<null>";
150                 const char *exp_name = target->ft_exp != NULL ?
151                         (char *)target->ft_exp->exp_obd->obd_uuid.uuid :
152                         "<null>";
153
154                 CERROR("  exp: 0x%p (%s), srv: 0x%p (%s), idx: "LPU64"\n",
155                        target->ft_exp, exp_name, target->ft_srv,
156                        srv_name, target->ft_idx);
157         }
158
159         /*
160          * If target is not found, there is logical error anyway, so here is
161          * LBUG() to catch this situation.
162          */
163         LBUG();
164         RETURN(NULL);
165 }
166
167 static int fld_dht_hash(struct lu_client_fld *fld,
168                         seqno_t seq)
169 {
170         /* XXX: here should be DHT hash */
171         return fld_rrb_hash(fld, seq);
172 }
173
174 static struct lu_fld_target *
175 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
176 {
177         /* XXX: here should be DHT scan code */
178         return fld_rrb_scan(fld, seq);
179 }
180
181 struct lu_fld_hash fld_hash[3] = {
182         {
183                 .fh_name = "DHT",
184                 .fh_hash_func = fld_dht_hash,
185                 .fh_scan_func = fld_dht_scan
186         },
187         {
188                 .fh_name = "RRB",
189                 .fh_hash_func = fld_rrb_hash,
190                 .fh_scan_func = fld_rrb_scan
191         },
192         {
193                 0,
194         }
195 };
196
197 static struct lu_fld_target *
198 fld_client_get_target(struct lu_client_fld *fld,
199                       seqno_t seq)
200 {
201         struct lu_fld_target *target;
202         ENTRY;
203
204         LASSERT(fld->lcf_hash != NULL);
205
206         spin_lock(&fld->lcf_lock);
207         target = fld->lcf_hash->fh_scan_func(fld, seq);
208         spin_unlock(&fld->lcf_lock);
209
210         if (target != NULL) {
211                 CDEBUG(D_INFO, "%s: Found target (idx "LPU64
212                        ") by seq "LPX64"\n", fld->lcf_name,
213                        target->ft_idx, seq);
214         }
215
216         RETURN(target);
217 }
218
219 /*
220  * Add export to FLD. This is usually done by CMM and LMV as they are main users
221  * of FLD module.
222  */
223 int fld_client_add_target(struct lu_client_fld *fld,
224                           struct lu_fld_target *tar)
225 {
226         const char *name = fld_target_name(tar);
227         struct lu_fld_target *target, *tmp;
228         ENTRY;
229
230         LASSERT(tar != NULL);
231         LASSERT(name != NULL);
232         LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
233
234         if (fld->lcf_flags != LUSTRE_FLD_INIT) {
235                 CERROR("%s: Attempt to add target %s (idx "LPU64") "
236                        "on fly - skip it\n", fld->lcf_name, name,
237                        tar->ft_idx);
238                 RETURN(0);
239         } else {
240                 CDEBUG(D_INFO, "%s: Adding target %s (idx "
241                        LPU64")\n", fld->lcf_name, name, tar->ft_idx);
242         }
243
244         OBD_ALLOC_PTR(target);
245         if (target == NULL)
246                 RETURN(-ENOMEM);
247
248         spin_lock(&fld->lcf_lock);
249         list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
250                 if (tmp->ft_idx == tar->ft_idx) {
251                         spin_unlock(&fld->lcf_lock);
252                         OBD_FREE_PTR(target);
253                         CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
254                                name, fld_target_name(tmp), tmp->ft_idx);
255                         RETURN(-EEXIST);
256                 }
257         }
258
259         target->ft_exp = tar->ft_exp;
260         if (target->ft_exp != NULL)
261                 class_export_get(target->ft_exp);
262         target->ft_srv = tar->ft_srv;
263         target->ft_idx = tar->ft_idx;
264
265         list_add_tail(&target->ft_chain,
266                       &fld->lcf_targets);
267
268         fld->lcf_count++;
269         spin_unlock(&fld->lcf_lock);
270
271         RETURN(0);
272 }
273 EXPORT_SYMBOL(fld_client_add_target);
274
275 /* Remove export from FLD */
276 int fld_client_del_target(struct lu_client_fld *fld,
277                           __u64 idx)
278 {
279         struct lu_fld_target *target, *tmp;
280         ENTRY;
281
282         spin_lock(&fld->lcf_lock);
283         list_for_each_entry_safe(target, tmp,
284                                  &fld->lcf_targets, ft_chain) {
285                 if (target->ft_idx == idx) {
286                         fld->lcf_count--;
287                         list_del(&target->ft_chain);
288                         spin_unlock(&fld->lcf_lock);
289
290                         if (target->ft_exp != NULL)
291                                 class_export_put(target->ft_exp);
292
293                         OBD_FREE_PTR(target);
294                         RETURN(0);
295                 }
296         }
297         spin_unlock(&fld->lcf_lock);
298         RETURN(-ENOENT);
299 }
300 EXPORT_SYMBOL(fld_client_del_target);
301
302 static void fld_client_proc_fini(struct lu_client_fld *fld);
303
304 #ifdef LPROCFS
305 static int fld_client_proc_init(struct lu_client_fld *fld)
306 {
307         int rc;
308         ENTRY;
309
310         fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
311                                              fld_type_proc_dir,
312                                              NULL, NULL);
313
314         if (IS_ERR(fld->lcf_proc_dir)) {
315                 CERROR("%s: LProcFS failed in fld-init\n",
316                        fld->lcf_name);
317                 rc = PTR_ERR(fld->lcf_proc_dir);
318                 RETURN(rc);
319         }
320
321         rc = lprocfs_add_vars(fld->lcf_proc_dir,
322                               fld_client_proc_list, fld);
323         if (rc) {
324                 CERROR("%s: Can't init FLD proc, rc %d\n",
325                        fld->lcf_name, rc);
326                 GOTO(out_cleanup, rc);
327         }
328
329         RETURN(0);
330
331 out_cleanup:
332         fld_client_proc_fini(fld);
333         return rc;
334 }
335
336 static void fld_client_proc_fini(struct lu_client_fld *fld)
337 {
338         ENTRY;
339         if (fld->lcf_proc_dir) {
340                 if (!IS_ERR(fld->lcf_proc_dir))
341                         lprocfs_remove(&fld->lcf_proc_dir);
342                 fld->lcf_proc_dir = NULL;
343         }
344         EXIT;
345 }
346 #else
347 static int fld_client_proc_init(struct lu_client_fld *fld)
348 {
349         return 0;
350 }
351
352 static void fld_client_proc_fini(struct lu_client_fld *fld)
353 {
354         return;
355 }
356 #endif
357
358 static inline int hash_is_sane(int hash)
359 {
360         return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
361 }
362
363 int fld_client_init(struct lu_client_fld *fld,
364                     const char *prefix, int hash)
365 {
366 #ifdef __KERNEL__
367         int cache_size, cache_threshold;
368 #endif
369         int rc;
370         ENTRY;
371
372         LASSERT(fld != NULL);
373
374         snprintf(fld->lcf_name, sizeof(fld->lcf_name),
375                  "cli-%s", prefix);
376
377         if (!hash_is_sane(hash)) {
378                 CERROR("%s: Wrong hash function %#x\n",
379                        fld->lcf_name, hash);
380                 RETURN(-EINVAL);
381         }
382
383         fld->lcf_count = 0;
384         spin_lock_init(&fld->lcf_lock);
385         fld->lcf_hash = &fld_hash[hash];
386         fld->lcf_flags = LUSTRE_FLD_INIT;
387         CFS_INIT_LIST_HEAD(&fld->lcf_targets);
388
389 #ifdef __KERNEL__
390         cache_size = FLD_CLIENT_CACHE_SIZE /
391                 sizeof(struct fld_cache_entry);
392
393         cache_threshold = cache_size *
394                 FLD_CLIENT_CACHE_THRESHOLD / 100;
395
396         fld->lcf_cache = fld_cache_init(fld->lcf_name,
397                                         FLD_CLIENT_HTABLE_SIZE,
398                                         cache_size, cache_threshold);
399         if (IS_ERR(fld->lcf_cache)) {
400                 rc = PTR_ERR(fld->lcf_cache);
401                 fld->lcf_cache = NULL;
402                 GOTO(out, rc);
403         }
404 #endif
405
406         rc = fld_client_proc_init(fld);
407         if (rc)
408                 GOTO(out, rc);
409         EXIT;
410 out:
411         if (rc)
412                 fld_client_fini(fld);
413         else
414                 CDEBUG(D_INFO, "%s: Using \"%s\" hash\n",
415                        fld->lcf_name, fld->lcf_hash->fh_name);
416         return rc;
417 }
418 EXPORT_SYMBOL(fld_client_init);
419
420 void fld_client_fini(struct lu_client_fld *fld)
421 {
422         struct lu_fld_target *target, *tmp;
423         ENTRY;
424
425         fld_client_proc_fini(fld);
426
427         spin_lock(&fld->lcf_lock);
428         list_for_each_entry_safe(target, tmp,
429                                  &fld->lcf_targets, ft_chain) {
430                 fld->lcf_count--;
431                 list_del(&target->ft_chain);
432                 if (target->ft_exp != NULL)
433                         class_export_put(target->ft_exp);
434                 OBD_FREE_PTR(target);
435         }
436         spin_unlock(&fld->lcf_lock);
437
438 #ifdef __KERNEL__
439         if (fld->lcf_cache != NULL) {
440                 if (!IS_ERR(fld->lcf_cache))
441                         fld_cache_fini(fld->lcf_cache);
442                 fld->lcf_cache = NULL;
443         }
444 #endif
445
446         EXIT;
447 }
448 EXPORT_SYMBOL(fld_client_fini);
449
450 static int fld_client_rpc(struct obd_export *exp,
451                           struct md_fld *mf, __u32 fld_op)
452 {
453         struct ptlrpc_request *req;
454         struct md_fld         *pmf;
455         __u32                 *op;
456         int                    rc;
457         ENTRY;
458
459         LASSERT(exp != NULL);
460
461         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_FLD_QUERY,
462                                         LUSTRE_MDS_VERSION, FLD_QUERY);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
467         *op = fld_op;
468
469         pmf = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
470         *pmf = *mf;
471
472         ptlrpc_request_set_replen(req);
473         req->rq_request_portal = FLD_REQUEST_PORTAL;
474         ptlrpc_at_set_req_timeout(req);
475
476         if (fld_op != FLD_LOOKUP)
477                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
478         fld_enter_request(&exp->exp_obd->u.cli);
479         rc = ptlrpc_queue_wait(req);
480         fld_exit_request(&exp->exp_obd->u.cli);
481         if (fld_op != FLD_LOOKUP)
482                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
483         if (rc)
484                 GOTO(out_req, rc);
485
486         pmf = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
487         if (pmf == NULL)
488                 GOTO(out_req, rc = -EFAULT);
489         *mf = *pmf;
490         EXIT;
491 out_req:
492         ptlrpc_req_finished(req);
493         return rc;
494 }
495
496 int fld_client_create(struct lu_client_fld *fld,
497                       seqno_t seq, mdsno_t mds,
498                       const struct lu_env *env)
499 {
500         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
501         struct lu_fld_target *target;
502         int rc;
503         ENTRY;
504
505         fld->lcf_flags |= LUSTRE_FLD_RUN;
506         target = fld_client_get_target(fld, seq);
507         LASSERT(target != NULL);
508
509         CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: "
510                LPU64") on target %s (idx "LPU64")\n", fld->lcf_name,
511                seq, mds, fld_target_name(target), target->ft_idx);
512
513 #ifdef __KERNEL__
514         if (target->ft_srv != NULL) {
515                 LASSERT(env != NULL);
516                 rc = fld_server_create(target->ft_srv, env, seq, mds);
517         } else {
518 #endif
519                 rc = fld_client_rpc(target->ft_exp, &md_fld, FLD_CREATE);
520 #ifdef __KERNEL__
521         }
522 #endif
523
524         if (rc == 0) {
525                 /*
526                  * Do not return result of calling fld_cache_insert()
527                  * here. First of all because it may return -EEXISTS. Another
528                  * reason is that, we do not want to stop proceeding because of
529                  * cache errors.
530                  */
531                 fld_cache_insert(fld->lcf_cache, seq, mds);
532         } else {
533                 CERROR("%s: Can't create FLD entry, rc %d\n",
534                        fld->lcf_name, rc);
535         }
536
537         RETURN(rc);
538 }
539 EXPORT_SYMBOL(fld_client_create);
540
541 int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
542                       const struct lu_env *env)
543 {
544         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
545         struct lu_fld_target *target;
546         int rc;
547         ENTRY;
548
549         fld->lcf_flags |= LUSTRE_FLD_RUN;
550         fld_cache_delete(fld->lcf_cache, seq);
551
552         target = fld_client_get_target(fld, seq);
553         LASSERT(target != NULL);
554
555         CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on "
556                "target %s (idx "LPU64")\n", fld->lcf_name, seq,
557                fld_target_name(target), target->ft_idx);
558
559 #ifdef __KERNEL__
560         if (target->ft_srv != NULL) {
561                 LASSERT(env != NULL);
562                 rc = fld_server_delete(target->ft_srv,
563                                        env, seq);
564         } else {
565 #endif
566                 rc = fld_client_rpc(target->ft_exp,
567                                     &md_fld, FLD_DELETE);
568 #ifdef __KERNEL__
569         }
570 #endif
571
572         RETURN(rc);
573 }
574 EXPORT_SYMBOL(fld_client_delete);
575
576 int fld_client_lookup(struct lu_client_fld *fld,
577                       seqno_t seq, mdsno_t *mds,
578                       const struct lu_env *env)
579 {
580         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
581         struct lu_fld_target *target;
582         int rc;
583         ENTRY;
584
585         fld->lcf_flags |= LUSTRE_FLD_RUN;
586
587         rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
588         if (rc == 0)
589                 RETURN(0);
590
591         /* Can not find it in the cache */
592         target = fld_client_get_target(fld, seq);
593         LASSERT(target != NULL);
594
595         CDEBUG(D_INFO, "%s: Lookup fld entry (seq: "LPX64") on "
596                "target %s (idx "LPU64")\n", fld->lcf_name, seq,
597                fld_target_name(target), target->ft_idx);
598
599 #ifdef __KERNEL__
600         if (target->ft_srv != NULL) {
601                 LASSERT(env != NULL);
602                 rc = fld_server_lookup(target->ft_srv,
603                                        env, seq, &md_fld.mf_mds);
604         } else {
605 #endif
606                 /*
607                  * insert the 'inflight' sequence. No need to protect that,
608                  * we are trying to reduce numbers of RPC but not restrict
609                  * to them exactly one 
610                  */
611                 fld_cache_insert_inflight(fld->lcf_cache, seq);
612                 rc = fld_client_rpc(target->ft_exp,
613                                     &md_fld, FLD_LOOKUP);
614 #ifdef __KERNEL__
615         }
616 #endif
617         if (seq < FID_SEQ_START) {
618                 /*
619                  * The current solution for IGIF is to bind it to mds0.
620                  * In the future, this should be fixed once IGIF can be found
621                  * in FLD.
622                  */ 
623                 md_fld.mf_mds = 0;
624                 rc = 0;
625         }
626
627         if (rc == 0) {
628                 *mds = md_fld.mf_mds;
629
630                 /*
631                  * Do not return error here as well. See previous comment in
632                  * same situation in function fld_client_create().
633                  */
634                 fld_cache_insert(fld->lcf_cache, seq, *mds);
635         } else {
636                 /* remove 'inflight' seq if it exists */
637                 fld_cache_delete(fld->lcf_cache, seq);
638         }
639         RETURN(rc);
640 }
641 EXPORT_SYMBOL(fld_client_lookup);
642
643 void fld_client_flush(struct lu_client_fld *fld)
644 {
645 #ifdef __KERNEL__
646         fld_cache_flush(fld->lcf_cache);
647 #endif
648 }
649 EXPORT_SYMBOL(fld_client_flush);