Whamcloud - gitweb
- many fixes in fld, adding it to LMv and CMM
[fs/lustre-release.git] / lustre / fld / fld_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_handler.c
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: WangDi <wangdi@clusterfs.com>
8  *           Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_FLD
32
33 #ifdef __KERNEL__
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
41 #endif
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
54
55 #ifdef __KERNEL__
56 /* XXX: maybe these 2 items should go to sbi */
57 struct fld_cache_info *fld_cache = NULL;
58
59 enum {
60         FLD_HTABLE_BITS = 8,
61         FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
62         FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
63 };
64
65 static __u32 fld_cache_hash(__u64 seq)
66 {
67         return seq;
68 }
69
70 static int
71 fld_cache_insert(struct fld_cache_info *fld_cache,
72                  __u64 seq, __u64 mds)
73 {
74         struct fld_cache *fld;
75         struct hlist_head *bucket;
76         struct hlist_node *scan;
77         int rc = 0;
78         ENTRY;
79
80         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
81                                         fld_cache->fld_hash_mask);
82
83         OBD_ALLOC_PTR(fld);
84         if (!fld)
85                 RETURN(-ENOMEM);
86
87         INIT_HLIST_NODE(&fld->fld_list);
88         fld->fld_mds = mds;
89         fld->fld_seq = seq;
90
91         spin_lock(&fld_cache->fld_lock);
92         hlist_for_each_entry(fld, scan, bucket, fld_list) {
93                 if (fld->fld_seq == seq) {
94                         spin_unlock(&fld_cache->fld_lock);
95                         GOTO(exit, rc = -EEXIST);
96                 }
97         }
98         hlist_add_head(&fld->fld_list, bucket);
99         spin_unlock(&fld_cache->fld_lock);
100 exit:
101         if (rc != 0)
102                 OBD_FREE(fld, sizeof(*fld));
103         RETURN(rc);
104 }
105
106 static struct fld_cache *
107 fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
108 {
109         struct hlist_head *bucket;
110         struct hlist_node *scan;
111         struct fld_cache *fld;
112         ENTRY;
113
114         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
115                                         fld_cache->fld_hash_mask);
116
117         spin_lock(&fld_cache->fld_lock);
118         hlist_for_each_entry(fld, scan, bucket, fld_list) {
119                 if (fld->fld_seq == seq) {
120                         spin_unlock(&fld_cache->fld_lock);
121                         RETURN(fld);
122                 }
123         }
124         spin_unlock(&fld_cache->fld_lock);
125
126         RETURN(NULL);
127 }
128
129 static void
130 fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
131 {
132         struct hlist_head *bucket;
133         struct hlist_node *scan;
134         struct fld_cache *fld;
135         ENTRY;
136
137         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
138                                         fld_cache->fld_hash_mask);
139
140         spin_lock(&fld_cache->fld_lock);
141         hlist_for_each_entry(fld, scan, bucket, fld_list) {
142                 if (fld->fld_seq == seq) {
143                         hlist_del_init(&fld->fld_list);
144                         GOTO(out_unlock, 0);
145                 }
146         }
147
148         EXIT;
149 out_unlock:
150         spin_unlock(&fld_cache->fld_lock);
151         return;
152 }
153 #endif
154
155 static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
156 {
157         return do_div(seq, fld->fld_count);
158 }
159
160 static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
161 {
162         CWARN("using Round Robin hash func for while\n");
163         return do_div(seq, fld->fld_count);
164 }
165
166 static struct lu_fld_hash fld_hash[3] = {
167         {
168                 .fh_name = "DHT",
169                 .fh_func = fld_dht_hash
170         },
171         {
172                 .fh_name = "Round Robin",
173                 .fh_func = fld_rrb_hash
174         },
175         {
176                 0,
177         }
178 };
179
180 static struct obd_export *
181 fld_client_get_exp(struct lu_client_fld *fld, __u64 seq)
182 {
183         struct obd_export *fld_exp;
184         int count = 0, hash;
185         ENTRY;
186
187         hash = fld->fld_hash->fh_func(fld, seq);
188
189         spin_lock(&fld->fld_lock);
190         list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
191                 if (count == hash)
192                         break;
193                 count++;
194         }
195         spin_unlock(&fld->fld_lock);
196
197         RETURN(fld_exp);
198 }
199
200 /* add export to FLD. This is usually done by CMM and LMV as they are main users
201  * of FLD module. */
202 int fld_client_add_export(struct lu_client_fld *fld,
203                           struct obd_export *exp)
204 {
205         struct obd_export *fld_exp;
206         ENTRY;
207
208         LASSERT(exp != NULL);
209
210         spin_lock(&fld->fld_lock);
211         list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
212                 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
213                                     &exp->exp_client_uuid))
214                 {
215                         spin_unlock(&fld->fld_lock);
216                         RETURN(-EEXIST);
217                 }
218         }
219         
220         fld_exp = class_export_get(exp);
221         list_add_tail(&exp->exp_fld_chain,
222                       &fld->fld_exports);
223         fld->fld_count++;
224         
225         spin_unlock(&fld->fld_lock);
226         
227         RETURN(0);
228 }
229 EXPORT_SYMBOL(fld_client_add_export);
230
231 /* remove export from FLD */
232 int fld_client_del_export(struct lu_client_fld *fld,
233                           struct obd_export *exp)
234 {
235         struct obd_export *fld_exp;
236         struct obd_export *tmp;
237         ENTRY;
238
239         spin_lock(&fld->fld_lock);
240         list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
241                 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
242                                     &exp->exp_client_uuid))
243                 {
244                         fld->fld_count--;
245                         list_del(&fld_exp->exp_fld_chain);
246                         class_export_get(fld_exp);
247
248                         spin_unlock(&fld->fld_lock);
249                         RETURN(0);
250                 }
251         }
252         spin_unlock(&fld->fld_lock);
253         
254         RETURN(-ENOENT);
255 }
256 EXPORT_SYMBOL(fld_client_del_export);
257
258 int fld_client_init(struct lu_client_fld *fld, int hash)
259 {
260         int rc = 0;
261         ENTRY;
262
263         LASSERT(fld != NULL);
264
265         if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
266                 CERROR("wrong hash function 0x%x\n", hash);
267                 RETURN(-EINVAL);
268         }
269         
270         INIT_LIST_HEAD(&fld->fld_exports);
271         spin_lock_init(&fld->fld_lock);
272         fld->fld_hash = &fld_hash[hash];
273         fld->fld_count = 0;
274         
275         CDEBUG(D_INFO, "Client FLD initialized, using %s\n",
276                fld->fld_hash->fh_name);
277         RETURN(rc);
278 }
279 EXPORT_SYMBOL(fld_client_init);
280
281 void fld_client_fini(struct lu_client_fld *fld)
282 {
283         struct obd_export *fld_exp;
284         struct obd_export *tmp;
285         ENTRY;
286
287         spin_lock(&fld->fld_lock);
288         list_for_each_entry_safe(fld_exp, tmp,
289                                  &fld->fld_exports, exp_fld_chain) {
290                 fld->fld_count--;
291                 list_del(&fld_exp->exp_fld_chain);
292                 class_export_get(fld_exp);
293         }
294         spin_unlock(&fld->fld_lock);
295         CDEBUG(D_INFO, "Client FLD finalized\n");
296         EXIT;
297 }
298 EXPORT_SYMBOL(fld_client_fini);
299
300 static int
301 fld_client_rpc(struct obd_export *exp,
302                struct md_fld *mf, __u32 fld_op)
303 {
304         struct ptlrpc_request *req;
305         struct md_fld *pmf;
306         int mf_size = sizeof(*mf);
307         __u32 *op;
308         int size[2] = {sizeof(*op), mf_size}, rc;
309         ENTRY;
310
311         req = ptlrpc_prep_req(class_exp2cliimp(exp),
312                               LUSTRE_MDS_VERSION, FLD_QUERY,
313                               2, size, NULL);
314         if (req == NULL)
315                 RETURN(-ENOMEM);
316
317         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
318         *op = fld_op;
319
320         pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
321         memcpy(pmf, mf, sizeof(*mf));
322
323         req->rq_replen = lustre_msg_size(1, &mf_size);
324         req->rq_request_portal = MDS_FLD_PORTAL;
325         rc = ptlrpc_queue_wait(req);
326         if (rc)
327                 GOTO(out_req, rc);
328
329         pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
330                                  lustre_swab_md_fld);
331         *mf = *pmf; 
332 out_req:
333         ptlrpc_req_finished(req);
334         RETURN(rc);
335 }
336
337 int
338 fld_client_create(struct lu_client_fld *fld,
339                   __u64 seq, __u64 mds)
340 {
341 #if 0
342         struct obd_export *fld_exp;
343         struct md_fld      md_fld;
344         __u32 rc;
345         ENTRY;
346
347         fld_exp = fld_client_get_exp(fld, seq);
348         if (!fld_exp)
349                 RETURN(-EINVAL);
350         
351         md_fld.mf_seq = seq;
352         md_fld.mf_mds = mds;
353
354         rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
355         
356 #ifdef __KERNEL__
357         fld_cache_insert(fld_cache, seq, mds);
358 #endif
359         
360         RETURN(rc);
361 #endif
362         return 0;
363 }
364 EXPORT_SYMBOL(fld_client_create);
365
366 int
367 fld_client_delete(struct lu_client_fld *fld,
368                   __u64 seq, __u64 mds)
369 {
370         struct obd_export *fld_exp;
371         struct md_fld      md_fld;
372         __u32 rc;
373
374 #ifdef __KERNEL__
375         fld_cache_delete(fld_cache, seq);
376 #endif
377         
378         fld_exp = fld_client_get_exp(fld, seq);
379         if (!fld_exp)
380                 RETURN(-EINVAL);
381
382         md_fld.mf_seq = seq;
383         md_fld.mf_mds = mds;
384
385         rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
386         RETURN(rc);
387 }
388 EXPORT_SYMBOL(fld_client_delete);
389
390 int
391 fld_client_get(struct lu_client_fld *fld,
392                __u64 seq, __u64 *mds)
393 {
394         struct obd_export *fld_exp;
395         struct md_fld      md_fld;
396         int    vallen, rc;
397
398         fld_exp = fld_client_get_exp(fld, seq);
399         if (!fld_exp);
400                 RETURN(-EINVAL);
401
402         md_fld.mf_seq = seq;
403         vallen = sizeof(struct md_fld);
404
405         rc = fld_client_rpc(fld_exp, &md_fld, FLD_GET);
406         if (rc == 0)
407                 *mds = md_fld.mf_mds;
408
409         RETURN(rc);
410 }
411
412 /* lookup fid in the namespace of pfid according to the name */
413 int
414 fld_client_lookup(struct lu_client_fld *fld,
415                   __u64 seq, __u64 *mds)
416 {
417 #if 0
418 #ifdef __KERNEL__
419         struct fld_cache *fld_entry;
420 #endif
421         int rc;
422         ENTRY;
423
424 #ifdef __KERNEL__
425         /* lookup it in the cache */
426         fld_entry = fld_cache_lookup(fld_cache, seq);
427         if (fld_entry != NULL) {
428                 *mds = fld_entry->fld_mds;
429                 RETURN(0);
430         }
431 #endif
432         
433         /* can not find it in the cache */
434         rc = fld_client_get(fld, seq, mds);
435         if (rc)
436                 RETURN(rc);
437
438 #ifdef __KERNEL__
439         rc = fld_cache_insert(fld_cache, seq, *mds);
440 #endif
441         
442         RETURN(rc);
443 #endif
444         *mds = 0;
445         return 0;
446 }
447 EXPORT_SYMBOL(fld_client_lookup);
448
449 #ifdef __KERNEL__
450 static int fld_init(void)
451 {
452         ENTRY;
453
454         OBD_ALLOC_PTR(fld_cache);
455         if (fld_cache == NULL)
456                 RETURN(-ENOMEM);
457
458         /* init fld cache info */
459         fld_cache->fld_hash_mask = FLD_HTABLE_MASK;
460         OBD_ALLOC(fld_cache->fld_hash, FLD_HTABLE_SIZE *
461                   sizeof fld_cache->fld_hash[0]);
462         spin_lock_init(&fld_cache->fld_lock);
463
464         CDEBUG(D_INFO, "Client FLD, cache size %d\n",
465                FLD_HTABLE_SIZE);
466         
467         RETURN(0);
468 }
469
470 static int fld_fini(void)
471 {
472         if (fld_cache != NULL) {
473                 OBD_FREE(fld_cache->fld_hash, FLD_HTABLE_SIZE *
474                          sizeof fld_cache->fld_hash[0]);
475                 OBD_FREE_PTR(fld_cache);
476         }
477         return 0;
478 }
479
480 static int __init fld_mod_init(void)
481 {
482         fld_init();
483         return 0;
484 }
485
486 static void __exit fld_mod_exit(void)
487 {
488         fld_fini();
489         return;
490 }
491
492
493 static struct fld_list fld_list_head;
494
495 static int
496 fld_server_handle(struct lu_server_fld *fld,
497                   const struct lu_context *ctx,
498                   __u32 opts, struct md_fld *mf)
499 {
500         int rc;
501         ENTRY;
502
503         switch (opts) {
504         case FLD_CREATE:
505                 rc = fld_handle_insert(fld, ctx, mf->mf_seq, mf->mf_mds);
506                 break;
507         case FLD_DELETE:
508                 rc = fld_handle_delete(fld, ctx, mf->mf_seq);
509                 break;
510         case FLD_GET:
511                 rc = fld_handle_lookup(fld, ctx, mf->mf_seq, &mf->mf_mds);
512                 break;
513         default:
514                 rc = -EINVAL;
515                 break;
516         }
517         RETURN(rc);
518
519 }
520
521 static int
522 fld_req_handle0(const struct lu_context *ctx,
523                 struct lu_server_fld *fld,
524                 struct ptlrpc_request *req)
525 {
526         int rep_buf_size[3] = { 0, };
527         struct req_capsule pill;
528         struct md_fld *in;
529         struct md_fld *out;
530         int rc = -EPROTO;
531         __u32 *opc;
532         ENTRY;
533
534         req_capsule_init(&pill, req, RCL_SERVER,
535                          rep_buf_size);
536
537         req_capsule_set(&pill, &RQF_FLD_QUERY);
538         req_capsule_pack(&pill);
539
540         opc = req_capsule_client_get(&pill, &RMF_FLD_OPC);
541         if (opc != NULL) {
542                 in = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
543                 if (in == NULL) {
544                         CERROR("cannot unpack fld request\n");
545                         GOTO(out_pill, rc = -EPROTO);
546                 }
547                 out = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
548                 if (out == NULL) {
549                         CERROR("cannot allocate fld response\n");
550                         GOTO(out_pill, rc = -EPROTO);
551                 }
552                 *out = *in;
553                 rc = fld_server_handle(fld, ctx, *opc, out);
554         } else {
555                 CERROR("cannot unpack FLD operation\n");
556         }
557         
558 out_pill:
559         EXIT;
560         req_capsule_fini(&pill);
561         return rc;
562 }
563
564
565 static int fld_req_handle(struct ptlrpc_request *req)
566 {
567         int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
568         const struct lu_context *ctx;
569         struct lu_site    *site;
570         int rc = -EPROTO;
571         ENTRY;
572
573         OBD_FAIL_RETURN(OBD_FAIL_FLD_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
574
575         ctx = req->rq_svc_thread->t_ctx;
576         LASSERT(ctx != NULL);
577         LASSERT(ctx->lc_thread == req->rq_svc_thread);
578         if (req->rq_reqmsg->opc == FLD_QUERY) {
579                 if (req->rq_export != NULL) {
580                         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
581                         LASSERT(site != NULL);
582                         rc = fld_req_handle0(ctx, site->ls_fld, req);
583                 } else {
584                         CERROR("Unconnected request\n");
585                         req->rq_status = -ENOTCONN;
586                         GOTO(out, rc = -ENOTCONN);
587                 }
588         } else {
589                 CERROR("Wrong opcode: %d\n", req->rq_reqmsg->opc);
590                 req->rq_status = -ENOTSUPP;
591                 rc = ptlrpc_error(req);
592                 RETURN(rc);
593         }
594
595         EXIT;
596 out:
597         target_send_reply(req, rc, fail);
598         return 0;
599 }
600
601 int
602 fld_server_init(struct lu_server_fld *fld,
603                 const struct lu_context *ctx,
604                 struct dt_device *dt)
605 {
606         int rc;
607         struct ptlrpc_service_conf fld_conf = {
608                 .psc_nbufs            = MDS_NBUFS,
609                 .psc_bufsize          = MDS_BUFSIZE,
610                 .psc_max_req_size     = MDS_MAXREQSIZE,
611                 .psc_max_reply_size   = MDS_MAXREPSIZE,
612                 .psc_req_portal       = MDS_FLD_PORTAL,
613                 .psc_rep_portal       = MDC_REPLY_PORTAL,
614                 .psc_watchdog_timeout = FLD_SERVICE_WATCHDOG_TIMEOUT,
615                 .psc_num_threads      = FLD_NUM_THREADS
616         };
617         ENTRY;
618
619         fld->fld_dt = dt;
620         lu_device_get(&dt->dd_lu_dev);
621         INIT_LIST_HEAD(&fld_list_head.fld_list);
622         spin_lock_init(&fld_list_head.fld_lock);
623
624         rc = fld_iam_init(fld, ctx);
625
626         if (rc == 0) {
627                 fld->fld_service =
628                         ptlrpc_init_svc_conf(&fld_conf, fld_req_handle,
629                                              LUSTRE_FLD0_NAME,
630                                              fld->fld_proc_entry, NULL);
631                 if (fld->fld_service != NULL)
632                         rc = ptlrpc_start_threads(NULL, fld->fld_service,
633                                                   LUSTRE_FLD0_NAME);
634                 else
635                         rc = -ENOMEM;
636         }
637
638         if (rc != 0)
639                 fld_server_fini(fld, ctx);
640         else
641                 CDEBUG(D_INFO, "Server FLD initialized\n");
642         RETURN(rc);
643 }
644 EXPORT_SYMBOL(fld_server_init);
645
646 void
647 fld_server_fini(struct lu_server_fld *fld,
648                 const struct lu_context *ctx)
649 {
650         struct list_head *pos, *n;
651         ENTRY;
652
653         if (fld->fld_service != NULL) {
654                 ptlrpc_unregister_service(fld->fld_service);
655                 fld->fld_service = NULL;
656         }
657
658         spin_lock(&fld_list_head.fld_lock);
659         list_for_each_safe(pos, n, &fld_list_head.fld_list) {
660                 struct fld_item *fld = list_entry(pos, struct fld_item,
661                                                   fld_list);
662                 list_del_init(&fld->fld_list);
663                 OBD_FREE_PTR(fld);
664         }
665         spin_unlock(&fld_list_head.fld_lock);
666         if (fld->fld_dt != NULL) {
667                 lu_device_put(&fld->fld_dt->dd_lu_dev);
668                 fld_iam_fini(fld, ctx);
669                 fld->fld_dt = NULL;
670         }
671         CDEBUG(D_INFO, "Server FLD finalized\n");
672         EXIT;
673 }
674 EXPORT_SYMBOL(fld_server_fini);
675
676 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
677 MODULE_DESCRIPTION("Lustre FLD");
678 MODULE_LICENSE("GPL");
679
680 cfs_module(mdd, "0.0.4", fld_mod_init, fld_mod_exit);
681 #endif