Whamcloud - gitweb
b=18068
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/fid/fid_handler.c
37  *
38  * Lustre Sequence Manager
39  *
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_FID
47
48 #ifdef __KERNEL__
49 # include <libcfs/libcfs.h>
50 # include <linux/module.h>
51 #else /* __KERNEL__ */
52 # include <liblustre.h>
53 #endif
54
55 #include <obd.h>
56 #include <obd_class.h>
57 #include <dt_object.h>
58 #include <md_object.h>
59 #include <obd_support.h>
60 #include <lustre_req_layout.h>
61 #include <lustre_fid.h>
62 #include "fid_internal.h"
63
64 #ifdef __KERNEL__
65 /* Assigns client to sequence controller node. */
66 int seq_server_set_cli(struct lu_server_seq *seq,
67                        struct lu_client_seq *cli,
68                        const struct lu_env *env)
69 {
70         int rc = 0;
71         ENTRY;
72
73         /*
74          * Ask client for new range, assign that range to ->seq_space and write
75          * seq state to backing store should be atomic.
76          */
77         down(&seq->lss_sem);
78
79         if (cli == NULL) {
80                 CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
81                        seq->lss_name, cli->lcs_name);
82                 seq->lss_cli = cli;
83                 GOTO(out_up, rc = 0);
84         }
85
86         if (seq->lss_cli != NULL) {
87                 CERROR("%s: Sequence controller is already "
88                        "assigned\n", seq->lss_name);
89                 GOTO(out_up, rc = -EINVAL);
90         }
91
92         CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
93                seq->lss_name, cli->lcs_name);
94
95         seq->lss_cli = cli;
96         cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
97         EXIT;
98 out_up:
99         up(&seq->lss_sem);
100         return rc;
101 }
102 EXPORT_SYMBOL(seq_server_set_cli);
103
104 /**
105  * On controller node, allocate new super sequence for regular sequence server.
106  * As this super sequence controller, this node suppose to maintain fld
107  * and update index.
108  * \a out range always has currect mds node number of requester.
109  */
110
111 static int __seq_server_alloc_super(struct lu_server_seq *seq,
112                                     struct lu_seq_range *in,
113                                     struct lu_seq_range *out,
114                                     const struct lu_env *env)
115 {
116         struct lu_seq_range *space = &seq->lss_space;
117         struct thandle *th;
118         __u64 mdt = out->lsr_mdt;
119         int rc, credit;
120         ENTRY;
121
122         LASSERT(range_is_sane(space));
123
124         if (in != NULL) {
125                 CDEBUG(D_INFO, "%s: Input seq range: "
126                        DRANGE"\n", seq->lss_name, PRANGE(in));
127
128                 if (in->lsr_end > space->lsr_start)
129                         space->lsr_start = in->lsr_end;
130                 *out = *in;
131
132                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
133                        seq->lss_name, PRANGE(space));
134         } else {
135                 if (range_space(space) < seq->lss_width) {
136                         CWARN("%s: Sequences space to be exhausted soon. "
137                               "Only "LPU64" sequences left\n", seq->lss_name,
138                               range_space(space));
139                         *out = *space;
140                         space->lsr_start = space->lsr_end;
141                 } else if (range_is_exhausted(space)) {
142                         CERROR("%s: Sequences space is exhausted\n",
143                                seq->lss_name);
144                         RETURN(-ENOSPC);
145                 } else {
146                         range_alloc(out, space, seq->lss_width);
147                 }
148         }
149         out->lsr_mdt = mdt;
150
151         credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS;
152
153         th = seq_store_trans_start(seq, env, credit);
154         if (IS_ERR(th))
155                 RETURN(PTR_ERR(th));
156
157         rc = seq_store_write(seq, env, th);
158         if (rc) {
159                 CERROR("%s: Can't write space data, rc %d\n",
160                        seq->lss_name, rc);
161                 goto out;
162         }
163
164         rc = fld_server_create(seq->lss_site->ms_server_fld,
165                                env, out, th);
166         if (rc) {
167                 CERROR("%s: Can't Update fld database, rc %d\n",
168                        seq->lss_name, rc);
169         }
170
171 out:
172         seq_store_trans_stop(seq, env, th);
173
174         CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
175                DRANGE"\n", seq->lss_name, rc, PRANGE(out));
176
177         RETURN(rc);
178 }
179
180 int seq_server_alloc_super(struct lu_server_seq *seq,
181                            struct lu_seq_range *in,
182                            struct lu_seq_range *out,
183                            const struct lu_env *env)
184 {
185         int rc;
186         ENTRY;
187
188         down(&seq->lss_sem);
189         rc = __seq_server_alloc_super(seq, in, out, env);
190         up(&seq->lss_sem);
191
192         RETURN(rc);
193 }
194
195 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
196                                    struct lu_seq_range *in,
197                                    struct lu_seq_range *out,
198                                    const struct lu_env *env)
199 {
200         struct lu_seq_range *space = &seq->lss_space;
201         struct thandle *th;
202         int rc = 0;
203
204         ENTRY;
205
206         LASSERT(range_is_sane(space));
207
208         /*
209          * This is recovery case. Adjust super range if input range looks like
210          * it is allocated from new super.
211          */
212         if (in != NULL) {
213                 CDEBUG(D_INFO, "%s: Input seq range: "
214                        DRANGE"\n", seq->lss_name, PRANGE(in));
215
216                 if (range_is_exhausted(space)) {
217                         /*
218                          * Server cannot send empty range to client, this is why
219                          * we check here that range from client is "newer" than
220                          * exhausted super.
221                          */
222                         LASSERT(in->lsr_end > space->lsr_start);
223
224                         /*
225                          * Start is set to end of last allocated, because it
226                          * *is* already allocated so we take that into account
227                          * and do not use for other allocations.
228                          */
229                         space->lsr_start = in->lsr_end;
230
231                         /*
232                          * End is set to in->lsr_start + super sequence
233                          * allocation unit. That is because in->lsr_start is
234                          * first seq in new allocated range from controller
235                          * before failure.
236                          */
237                         space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH;
238
239                         if (!seq->lss_cli) {
240                                 CERROR("%s: No sequence controller "
241                                        "is attached.\n", seq->lss_name);
242                                 RETURN(-ENODEV);
243                         }
244
245                         /*
246                          * Let controller know that this is recovery and last
247                          * obtained range from it was @space.
248                          */
249                         rc = seq_client_replay_super(seq->lss_cli, space, env);
250
251                         if (rc) {
252                                 CERROR("%s: Can't replay super-sequence, "
253                                        "rc %d\n", seq->lss_name, rc);
254                                 RETURN(rc);
255                         }
256                 } else {
257                         /*
258                          * Update super start by end from client's range. Super
259                          * end should not be changed if range was not exhausted.
260                          */
261                         if (in->lsr_end > space->lsr_start)
262                                 space->lsr_start = in->lsr_end;
263                 }
264
265                 /* sending replay_super to update fld as only super sequence
266                  * server can update fld.
267                  * we are sending meta sequence to fld rather than super
268                  * sequence, but fld server can handle range merging. */
269
270                 in->lsr_mdt = space->lsr_mdt;
271                 rc = seq_client_replay_super(seq->lss_cli, in, env);
272
273                 if (rc) {
274                         CERROR("%s: Can't replay super-sequence, "
275                                         "rc %d\n", seq->lss_name, rc);
276                         RETURN(rc);
277                 }
278
279                 *out = *in;
280
281                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
282                        seq->lss_name, PRANGE(space));
283         } else {
284                 /*
285                  * XXX: Avoid cascading RPCs using kind of async preallocation
286                  * when meta-sequence is close to exhausting.
287                  */
288                 if (range_is_exhausted(space)) {
289                         if (!seq->lss_cli) {
290                                 CERROR("%s: No sequence controller "
291                                        "is attached.\n", seq->lss_name);
292                                 RETURN(-ENODEV);
293                         }
294
295                         rc = seq_client_alloc_super(seq->lss_cli, env);
296                         if (rc) {
297                                 CERROR("%s: Can't allocate super-sequence, "
298                                        "rc %d\n", seq->lss_name, rc);
299                                 RETURN(rc);
300                         }
301
302                         /* Saving new range to allocation space. */
303                         *space = seq->lss_cli->lcs_space;
304                         LASSERT(range_is_sane(space));
305                 }
306
307                 range_alloc(out, space, seq->lss_width);
308         }
309
310         th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
311         if (IS_ERR(th))
312                 RETURN(PTR_ERR(th));
313
314         rc = seq_store_write(seq, env, th);
315         if (rc) {
316                 CERROR("%s: Can't write space data, rc %d\n",
317                        seq->lss_name, rc);
318         }
319
320         if (rc == 0) {
321                 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
322                        DRANGE"\n", seq->lss_name, PRANGE(out));
323         }
324
325         seq_store_trans_stop(seq, env, th);
326         RETURN(rc);
327 }
328
329 int seq_server_alloc_meta(struct lu_server_seq *seq,
330                           struct lu_seq_range *in,
331                           struct lu_seq_range *out,
332                           const struct lu_env *env)
333 {
334         int rc;
335         ENTRY;
336
337         down(&seq->lss_sem);
338         rc = __seq_server_alloc_meta(seq, in, out, env);
339         up(&seq->lss_sem);
340
341         RETURN(rc);
342 }
343 EXPORT_SYMBOL(seq_server_alloc_meta);
344
345 static int seq_server_handle(struct lu_site *site,
346                              const struct lu_env *env,
347                              __u32 opc, struct lu_seq_range *in,
348                              struct lu_seq_range *out)
349 {
350         int rc;
351         struct md_site *mite;
352         ENTRY;
353
354         mite = lu_site2md(site);
355         switch (opc) {
356         case SEQ_ALLOC_META:
357                 if (!mite->ms_server_seq) {
358                         CERROR("Sequence server is not "
359                                "initialized\n");
360                         RETURN(-EINVAL);
361                 }
362                 rc = seq_server_alloc_meta(mite->ms_server_seq,
363                                            in, out, env);
364                 break;
365         case SEQ_ALLOC_SUPER:
366                 if (!mite->ms_control_seq) {
367                         CERROR("Sequence controller is not "
368                                "initialized\n");
369                         RETURN(-EINVAL);
370                 }
371                 rc = seq_server_alloc_super(mite->ms_control_seq,
372                                             in, out, env);
373                 break;
374         default:
375                 rc = -EINVAL;
376                 break;
377         }
378
379         RETURN(rc);
380 }
381
382 static int seq_req_handle(struct ptlrpc_request *req,
383                           const struct lu_env *env,
384                           struct seq_thread_info *info)
385 {
386         struct lu_seq_range *out, *in = NULL, *tmp;
387         struct lu_site *site;
388         int rc = -EPROTO;
389         __u32 *opc;
390         ENTRY;
391
392         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
393         LASSERT(site != NULL);
394                         
395         rc = req_capsule_server_pack(info->sti_pill);
396         if (rc)
397                 RETURN(err_serious(rc));
398
399         opc = req_capsule_client_get(info->sti_pill, &RMF_SEQ_OPC);
400         if (opc != NULL) {
401                 out = req_capsule_server_get(info->sti_pill, &RMF_SEQ_RANGE);
402                 if (out == NULL)
403                         RETURN(err_serious(-EPROTO));
404
405                 tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
406
407                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
408                         in = tmp;
409                         LASSERT(!range_is_zero(in) && range_is_sane(in));
410                 }
411                 /* seq client passed mdt id, we need to pass that using out
412                  * range parameter */
413
414                 out->lsr_mdt = tmp->lsr_mdt;
415                 rc = seq_server_handle(site, env, *opc, in, out);
416         } else
417                 rc = err_serious(-EPROTO);
418
419         RETURN(rc);
420 }
421
422 /* context key constructor/destructor: seq_key_init, seq_key_fini */
423 LU_KEY_INIT_FINI(seq, struct seq_thread_info);
424
425 /* context key: seq_thread_key */
426 LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD);
427
428 static void seq_thread_info_init(struct ptlrpc_request *req,
429                                  struct seq_thread_info *info)
430 {
431         info->sti_pill = &req->rq_pill;
432         /* Init request capsule */
433         req_capsule_init(info->sti_pill, req, RCL_SERVER);
434         req_capsule_set(info->sti_pill, &RQF_SEQ_QUERY);
435 }
436
437 static void seq_thread_info_fini(struct seq_thread_info *info)
438 {
439         req_capsule_fini(info->sti_pill);
440 }
441
442 static int seq_handle(struct ptlrpc_request *req)
443 {
444         const struct lu_env *env;
445         struct seq_thread_info *info;
446         int rc;
447
448         env = req->rq_svc_thread->t_env;
449         LASSERT(env != NULL);
450
451         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
452         LASSERT(info != NULL);
453
454         seq_thread_info_init(req, info);
455         rc = seq_req_handle(req, env, info);
456         seq_thread_info_fini(info);
457
458         return rc;
459 }
460
461 /*
462  * Entry point for handling FLD RPCs called from MDT.
463  */
464 int seq_query(struct com_thread_info *info)
465 {
466         return seq_handle(info->cti_pill->rc_req);
467 }
468 EXPORT_SYMBOL(seq_query);
469
470 static void seq_server_proc_fini(struct lu_server_seq *seq);
471
472 #ifdef LPROCFS
473 static int seq_server_proc_init(struct lu_server_seq *seq)
474 {
475         int rc;
476         ENTRY;
477
478         seq->lss_proc_dir = lprocfs_register(seq->lss_name,
479                                              seq_type_proc_dir,
480                                              NULL, NULL);
481         if (IS_ERR(seq->lss_proc_dir)) {
482                 rc = PTR_ERR(seq->lss_proc_dir);
483                 RETURN(rc);
484         }
485
486         rc = lprocfs_add_vars(seq->lss_proc_dir,
487                               seq_server_proc_list, seq);
488         if (rc) {
489                 CERROR("%s: Can't init sequence manager "
490                        "proc, rc %d\n", seq->lss_name, rc);
491                 GOTO(out_cleanup, rc);
492         }
493
494         RETURN(0);
495
496 out_cleanup:
497         seq_server_proc_fini(seq);
498         return rc;
499 }
500
501 static void seq_server_proc_fini(struct lu_server_seq *seq)
502 {
503         ENTRY;
504         if (seq->lss_proc_dir != NULL) {
505                 if (!IS_ERR(seq->lss_proc_dir))
506                         lprocfs_remove(&seq->lss_proc_dir);
507                 seq->lss_proc_dir = NULL;
508         }
509         EXIT;
510 }
511 #else
512 static int seq_server_proc_init(struct lu_server_seq *seq)
513 {
514         return 0;
515 }
516
517 static void seq_server_proc_fini(struct lu_server_seq *seq)
518 {
519         return;
520 }
521 #endif
522
523 int seq_server_init(struct lu_server_seq *seq,
524                     struct dt_device *dev,
525                     const char *prefix,
526                     enum lu_mgr_type type,
527                     struct md_site *ms,
528                     const struct lu_env *env)
529 {
530         struct thandle *th;
531         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
532         ENTRY;
533
534         LASSERT(dev != NULL);
535         LASSERT(prefix != NULL);
536
537         seq->lss_cli = NULL;
538         seq->lss_type = type;
539         seq->lss_site = ms;
540         range_init(&seq->lss_space);
541         sema_init(&seq->lss_sem, 1);
542
543         seq->lss_width = is_srv ?
544                 LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
545
546         snprintf(seq->lss_name, sizeof(seq->lss_name),
547                  "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
548
549         rc = seq_store_init(seq, env, dev);
550         if (rc)
551                 GOTO(out, rc);
552         /* Request backing store for saved sequence info. */
553         rc = seq_store_read(seq, env);
554         if (rc == -ENODATA) {
555
556                 /* Nothing is read, init by default value. */
557                 seq->lss_space = is_srv ?
558                         LUSTRE_SEQ_ZERO_RANGE:
559                         LUSTRE_SEQ_SPACE_RANGE;
560
561                 seq->lss_space.lsr_mdt = ms->ms_node_id;
562                 CDEBUG(D_INFO, "%s: No data found "
563                        "on store. Initialize space\n",
564                        seq->lss_name);
565
566                 th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
567                 if (IS_ERR(th))
568                         RETURN(PTR_ERR(th));
569
570                 /* Save default controller value to store. */
571                 rc = seq_store_write(seq, env, th);
572                 if (rc) {
573                         CERROR("%s: Can't write space data, "
574                                "rc %d\n", seq->lss_name, rc);
575                 }
576                 seq_store_trans_stop(seq, env, th);
577         } else if (rc) {
578                 CERROR("%s: Can't read space data, rc %d\n",
579                        seq->lss_name, rc);
580                 GOTO(out, rc);
581         }
582
583         if (is_srv) {
584                 LASSERT(range_is_sane(&seq->lss_space));
585         } else {
586                 LASSERT(!range_is_zero(&seq->lss_space) &&
587                         range_is_sane(&seq->lss_space));
588         }
589
590         rc  = seq_server_proc_init(seq);
591         if (rc)
592                 GOTO(out, rc);
593
594         EXIT;
595 out:
596         if (rc)
597                 seq_server_fini(seq, env);
598         return rc;
599 }
600 EXPORT_SYMBOL(seq_server_init);
601
602 void seq_server_fini(struct lu_server_seq *seq,
603                      const struct lu_env *env)
604 {
605         ENTRY;
606
607         seq_server_proc_fini(seq);
608         seq_store_fini(seq, env);
609
610         EXIT;
611 }
612 EXPORT_SYMBOL(seq_server_fini);
613
614 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
615
616 static struct lu_local_obj_desc llod_seq_srv = {
617         .llod_name      = LUSTRE_SEQ_SRV_NAME,
618         .llod_oid       = FID_SEQ_SRV_OID,
619         .llod_is_index  = 0,
620 };
621
622 static struct lu_local_obj_desc llod_seq_ctl = {
623         .llod_name      = LUSTRE_SEQ_CTL_NAME,
624         .llod_oid       = FID_SEQ_CTL_OID,
625         .llod_is_index  = 0,
626 };
627
628 static int __init fid_mod_init(void)
629 {
630         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
631                                              proc_lustre_root,
632                                              NULL, NULL);
633         if (IS_ERR(seq_type_proc_dir))
634                 return PTR_ERR(seq_type_proc_dir);
635
636         llo_local_obj_register(&llod_seq_srv);
637         llo_local_obj_register(&llod_seq_ctl);
638
639         LU_CONTEXT_KEY_INIT(&seq_thread_key);
640         lu_context_key_register(&seq_thread_key);
641         return 0;
642 }
643
644 static void __exit fid_mod_exit(void)
645 {
646         llo_local_obj_unregister(&llod_seq_srv);
647         llo_local_obj_unregister(&llod_seq_ctl);
648
649         lu_context_key_degister(&seq_thread_key);
650         if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
651                 lprocfs_remove(&seq_type_proc_dir);
652                 seq_type_proc_dir = NULL;
653         }
654 }
655
656 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
657 MODULE_DESCRIPTION("Lustre FID Module");
658 MODULE_LICENSE("GPL");
659
660 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
661 #endif