Whamcloud - gitweb
253ea38ae69d773701e7087af3d98bd5337c3c71
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/fid/fid_handler.c
37  *
38  * Lustre Sequence Manager
39  *
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_FID
47
48 #ifdef __KERNEL__
49 # include <libcfs/libcfs.h>
50 # include <linux/module.h>
51 #else /* __KERNEL__ */
52 # include <liblustre.h>
53 #endif
54
55 #include <obd.h>
56 #include <obd_class.h>
57 #include <dt_object.h>
58 #include <md_object.h>
59 #include <obd_support.h>
60 #include <lustre_req_layout.h>
61 #include <lustre_fid.h>
62 #include "fid_internal.h"
63
64 #ifdef __KERNEL__
65 /* Assigns client to sequence controller node. */
66 int seq_server_set_cli(struct lu_server_seq *seq,
67                        struct lu_client_seq *cli,
68                        const struct lu_env *env)
69 {
70         int rc = 0;
71         ENTRY;
72
73         /*
74          * Ask client for new range, assign that range to ->seq_space and write
75          * seq state to backing store should be atomic.
76          */
77         cfs_down(&seq->lss_sem);
78
79         if (cli == NULL) {
80                 CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
81                        seq->lss_name, cli->lcs_name);
82                 seq->lss_cli = cli;
83                 GOTO(out_up, rc = 0);
84         }
85
86         if (seq->lss_cli != NULL) {
87                 CERROR("%s: Sequence controller is already "
88                        "assigned\n", seq->lss_name);
89                 GOTO(out_up, rc = -EINVAL);
90         }
91
92         CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
93                seq->lss_name, cli->lcs_name);
94
95         seq->lss_cli = cli;
96         cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
97         EXIT;
98 out_up:
99         cfs_up(&seq->lss_sem);
100         return rc;
101 }
102 EXPORT_SYMBOL(seq_server_set_cli);
103
104 /**
105  * On controller node, allocate new super sequence for regular sequence server.
106  * As this super sequence controller, this node suppose to maintain fld
107  * and update index.
108  * \a out range always has currect mds node number of requester.
109  */
110
111 static int __seq_server_alloc_super(struct lu_server_seq *seq,
112                                     struct lu_seq_range *in,
113                                     struct lu_seq_range *out,
114                                     const struct lu_env *env)
115 {
116         struct lu_seq_range *space = &seq->lss_space;
117         struct thandle *th;
118         __u64 mdt = out->lsr_mdt;
119         int rc, credit;
120         ENTRY;
121
122         LASSERT(range_is_sane(space));
123
124         if (in != NULL) {
125                 CDEBUG(D_INFO, "%s: Input seq range: "
126                        DRANGE"\n", seq->lss_name, PRANGE(in));
127
128                 if (in->lsr_end > space->lsr_start)
129                         space->lsr_start = in->lsr_end;
130                 *out = *in;
131
132                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
133                        seq->lss_name, PRANGE(space));
134         } else {
135                 if (range_space(space) < seq->lss_width) {
136                         CWARN("%s: Sequences space to be exhausted soon. "
137                               "Only "LPU64" sequences left\n", seq->lss_name,
138                               range_space(space));
139                         *out = *space;
140                         space->lsr_start = space->lsr_end;
141                 } else if (range_is_exhausted(space)) {
142                         CERROR("%s: Sequences space is exhausted\n",
143                                seq->lss_name);
144                         RETURN(-ENOSPC);
145                 } else {
146                         range_alloc(out, space, seq->lss_width);
147                 }
148         }
149         out->lsr_mdt = mdt;
150
151         credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS;
152
153         th = seq_store_trans_start(seq, env, credit);
154         if (IS_ERR(th))
155                 RETURN(PTR_ERR(th));
156
157         rc = seq_store_write(seq, env, th);
158         if (rc) {
159                 CERROR("%s: Can't write space data, rc %d\n",
160                        seq->lss_name, rc);
161                 goto out;
162         }
163
164         rc = fld_server_create(seq->lss_site->ms_server_fld,
165                                env, out, th);
166         if (rc) {
167                 CERROR("%s: Can't Update fld database, rc %d\n",
168                        seq->lss_name, rc);
169         }
170
171 out:
172         seq_store_trans_stop(seq, env, th);
173
174         CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
175                DRANGE"\n", seq->lss_name, rc, PRANGE(out));
176
177         RETURN(rc);
178 }
179
180 int seq_server_alloc_super(struct lu_server_seq *seq,
181                            struct lu_seq_range *in,
182                            struct lu_seq_range *out,
183                            const struct lu_env *env)
184 {
185         int rc;
186         ENTRY;
187
188         cfs_down(&seq->lss_sem);
189         rc = __seq_server_alloc_super(seq, in, out, env);
190         cfs_up(&seq->lss_sem);
191
192         RETURN(rc);
193 }
194
195 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
196                                    struct lu_seq_range *in,
197                                    struct lu_seq_range *out,
198                                    const struct lu_env *env)
199 {
200         struct lu_seq_range *space = &seq->lss_space;
201         struct thandle *th;
202         int rc = 0;
203
204         ENTRY;
205
206         LASSERT(range_is_sane(space));
207
208         /*
209          * This is recovery case. Adjust super range if input range looks like
210          * it is allocated from new super.
211          */
212         if (in != NULL) {
213                 CDEBUG(D_INFO, "%s: Input seq range: "
214                        DRANGE"\n", seq->lss_name, PRANGE(in));
215
216                 if (in->lsr_end <= space->lsr_start) {
217                         /*
218                          * Client is replaying a fairly old range, server
219                          * don't need to do any allocation.
220                          */
221                 } else if (range_is_exhausted(space)) {
222                         /*
223                          * Start is set to end of last allocated, because it
224                          * *is* already allocated so we take that into account
225                          * and do not use for other allocations.
226                          */
227                         space->lsr_start = in->lsr_end;
228
229                         /*
230                          * End is set to in->lsr_start + super sequence
231                          * allocation unit. That is because in->lsr_start is
232                          * first seq in new allocated range from controller
233                          * before failure.
234                          */
235                         space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH;
236
237                         if (!seq->lss_cli) {
238                                 CERROR("%s: No sequence controller "
239                                        "is attached.\n", seq->lss_name);
240                                 RETURN(-ENODEV);
241                         }
242
243                         /*
244                          * Let controller know that this is recovery and last
245                          * obtained range from it was @space.
246                          */
247                         rc = seq_client_replay_super(seq->lss_cli, space, env);
248
249                         if (rc) {
250                                 CERROR("%s: Can't replay super-sequence, "
251                                        "rc %d\n", seq->lss_name, rc);
252                                 RETURN(rc);
253                         }
254                 } else {
255                         /*
256                          * Update super start by end from client's range. Super
257                          * end should not be changed if range was not exhausted.
258                          */
259                         space->lsr_start = in->lsr_end;
260                 }
261
262                 /* sending replay_super to update fld as only super sequence
263                  * server can update fld.
264                  * we are sending meta sequence to fld rather than super
265                  * sequence, but fld server can handle range merging. */
266
267                 in->lsr_mdt = space->lsr_mdt;
268                 rc = seq_client_replay_super(seq->lss_cli, in, env);
269
270                 if (rc) {
271                         CERROR("%s: Can't replay super-sequence, "
272                                         "rc %d\n", seq->lss_name, rc);
273                         RETURN(rc);
274                 }
275
276                 *out = *in;
277
278                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
279                        seq->lss_name, PRANGE(space));
280         } else {
281                 /*
282                  * XXX: Avoid cascading RPCs using kind of async preallocation
283                  * when meta-sequence is close to exhausting.
284                  */
285                 if (range_is_exhausted(space)) {
286                         if (!seq->lss_cli) {
287                                 CERROR("%s: No sequence controller "
288                                        "is attached.\n", seq->lss_name);
289                                 RETURN(-ENODEV);
290                         }
291
292                         rc = seq_client_alloc_super(seq->lss_cli, env);
293                         if (rc) {
294                                 CERROR("%s: Can't allocate super-sequence, "
295                                        "rc %d\n", seq->lss_name, rc);
296                                 RETURN(rc);
297                         }
298
299                         /* Saving new range to allocation space. */
300                         *space = seq->lss_cli->lcs_space;
301                         LASSERT(range_is_sane(space));
302                 }
303
304                 range_alloc(out, space, seq->lss_width);
305         }
306
307         th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
308         if (IS_ERR(th))
309                 RETURN(PTR_ERR(th));
310
311         rc = seq_store_write(seq, env, th);
312         if (rc) {
313                 CERROR("%s: Can't write space data, rc %d\n",
314                        seq->lss_name, rc);
315         }
316
317         if (rc == 0) {
318                 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
319                        DRANGE"\n", seq->lss_name, PRANGE(out));
320         }
321
322         seq_store_trans_stop(seq, env, th);
323         RETURN(rc);
324 }
325
326 int seq_server_alloc_meta(struct lu_server_seq *seq,
327                           struct lu_seq_range *in,
328                           struct lu_seq_range *out,
329                           const struct lu_env *env)
330 {
331         int rc;
332         ENTRY;
333
334         cfs_down(&seq->lss_sem);
335         rc = __seq_server_alloc_meta(seq, in, out, env);
336         cfs_up(&seq->lss_sem);
337
338         RETURN(rc);
339 }
340 EXPORT_SYMBOL(seq_server_alloc_meta);
341
342 static int seq_server_handle(struct lu_site *site,
343                              const struct lu_env *env,
344                              __u32 opc, struct lu_seq_range *in,
345                              struct lu_seq_range *out)
346 {
347         int rc;
348         struct md_site *mite;
349         ENTRY;
350
351         mite = lu_site2md(site);
352         switch (opc) {
353         case SEQ_ALLOC_META:
354                 if (!mite->ms_server_seq) {
355                         CERROR("Sequence server is not "
356                                "initialized\n");
357                         RETURN(-EINVAL);
358                 }
359                 rc = seq_server_alloc_meta(mite->ms_server_seq,
360                                            in, out, env);
361                 break;
362         case SEQ_ALLOC_SUPER:
363                 if (!mite->ms_control_seq) {
364                         CERROR("Sequence controller is not "
365                                "initialized\n");
366                         RETURN(-EINVAL);
367                 }
368                 rc = seq_server_alloc_super(mite->ms_control_seq,
369                                             in, out, env);
370                 break;
371         default:
372                 rc = -EINVAL;
373                 break;
374         }
375
376         RETURN(rc);
377 }
378
379 static int seq_req_handle(struct ptlrpc_request *req,
380                           const struct lu_env *env,
381                           struct seq_thread_info *info)
382 {
383         struct lu_seq_range *out, *in = NULL, *tmp;
384         struct lu_site *site;
385         int rc = -EPROTO;
386         __u32 *opc;
387         ENTRY;
388
389         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
390         LASSERT(site != NULL);
391
392         rc = req_capsule_server_pack(info->sti_pill);
393         if (rc)
394                 RETURN(err_serious(rc));
395
396         opc = req_capsule_client_get(info->sti_pill, &RMF_SEQ_OPC);
397         if (opc != NULL) {
398                 out = req_capsule_server_get(info->sti_pill, &RMF_SEQ_RANGE);
399                 if (out == NULL)
400                         RETURN(err_serious(-EPROTO));
401
402                 tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
403
404                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
405                         in = tmp;
406
407                         if (range_is_zero(in) || !range_is_sane(in)) {
408                                 CERROR("Replayed seq range is invalid: "
409                                        DRANGE"\n", PRANGE(in));
410                                 RETURN(err_serious(-EINVAL));
411                         }
412                 }
413                 /* seq client passed mdt id, we need to pass that using out
414                  * range parameter */
415
416                 out->lsr_mdt = tmp->lsr_mdt;
417                 rc = seq_server_handle(site, env, *opc, in, out);
418         } else
419                 rc = err_serious(-EPROTO);
420
421         RETURN(rc);
422 }
423
424 /* context key constructor/destructor: seq_key_init, seq_key_fini */
425 LU_KEY_INIT_FINI(seq, struct seq_thread_info);
426
427 /* context key: seq_thread_key */
428 LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD);
429
430 static void seq_thread_info_init(struct ptlrpc_request *req,
431                                  struct seq_thread_info *info)
432 {
433         info->sti_pill = &req->rq_pill;
434         /* Init request capsule */
435         req_capsule_init(info->sti_pill, req, RCL_SERVER);
436         req_capsule_set(info->sti_pill, &RQF_SEQ_QUERY);
437 }
438
439 static void seq_thread_info_fini(struct seq_thread_info *info)
440 {
441         req_capsule_fini(info->sti_pill);
442 }
443
444 static int seq_handle(struct ptlrpc_request *req)
445 {
446         const struct lu_env *env;
447         struct seq_thread_info *info;
448         int rc;
449
450         env = req->rq_svc_thread->t_env;
451         LASSERT(env != NULL);
452
453         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
454         LASSERT(info != NULL);
455
456         seq_thread_info_init(req, info);
457         rc = seq_req_handle(req, env, info);
458         seq_thread_info_fini(info);
459
460         return rc;
461 }
462
463 /*
464  * Entry point for handling FLD RPCs called from MDT.
465  */
466 int seq_query(struct com_thread_info *info)
467 {
468         return seq_handle(info->cti_pill->rc_req);
469 }
470 EXPORT_SYMBOL(seq_query);
471
472 static void seq_server_proc_fini(struct lu_server_seq *seq);
473
474 #ifdef LPROCFS
475 static int seq_server_proc_init(struct lu_server_seq *seq)
476 {
477         int rc;
478         ENTRY;
479
480         seq->lss_proc_dir = lprocfs_register(seq->lss_name,
481                                              seq_type_proc_dir,
482                                              NULL, NULL);
483         if (IS_ERR(seq->lss_proc_dir)) {
484                 rc = PTR_ERR(seq->lss_proc_dir);
485                 RETURN(rc);
486         }
487
488         rc = lprocfs_add_vars(seq->lss_proc_dir,
489                               seq_server_proc_list, seq);
490         if (rc) {
491                 CERROR("%s: Can't init sequence manager "
492                        "proc, rc %d\n", seq->lss_name, rc);
493                 GOTO(out_cleanup, rc);
494         }
495
496         RETURN(0);
497
498 out_cleanup:
499         seq_server_proc_fini(seq);
500         return rc;
501 }
502
503 static void seq_server_proc_fini(struct lu_server_seq *seq)
504 {
505         ENTRY;
506         if (seq->lss_proc_dir != NULL) {
507                 if (!IS_ERR(seq->lss_proc_dir))
508                         lprocfs_remove(&seq->lss_proc_dir);
509                 seq->lss_proc_dir = NULL;
510         }
511         EXIT;
512 }
513 #else
514 static int seq_server_proc_init(struct lu_server_seq *seq)
515 {
516         return 0;
517 }
518
519 static void seq_server_proc_fini(struct lu_server_seq *seq)
520 {
521         return;
522 }
523 #endif
524
525 int seq_server_init(struct lu_server_seq *seq,
526                     struct dt_device *dev,
527                     const char *prefix,
528                     enum lu_mgr_type type,
529                     struct md_site *ms,
530                     const struct lu_env *env)
531 {
532         struct thandle *th;
533         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
534         ENTRY;
535
536         LASSERT(dev != NULL);
537         LASSERT(prefix != NULL);
538
539         seq->lss_cli = NULL;
540         seq->lss_type = type;
541         seq->lss_site = ms;
542         range_init(&seq->lss_space);
543         cfs_sema_init(&seq->lss_sem, 1);
544
545         seq->lss_width = is_srv ?
546                 LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
547
548         snprintf(seq->lss_name, sizeof(seq->lss_name),
549                  "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
550
551         rc = seq_store_init(seq, env, dev);
552         if (rc)
553                 GOTO(out, rc);
554         /* Request backing store for saved sequence info. */
555         rc = seq_store_read(seq, env);
556         if (rc == -ENODATA) {
557
558                 /* Nothing is read, init by default value. */
559                 seq->lss_space = is_srv ?
560                         LUSTRE_SEQ_ZERO_RANGE:
561                         LUSTRE_SEQ_SPACE_RANGE;
562
563                 seq->lss_space.lsr_mdt = ms->ms_node_id;
564                 CDEBUG(D_INFO, "%s: No data found "
565                        "on store. Initialize space\n",
566                        seq->lss_name);
567
568                 th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
569                 if (IS_ERR(th))
570                         RETURN(PTR_ERR(th));
571
572                 /* Save default controller value to store. */
573                 rc = seq_store_write(seq, env, th);
574                 if (rc) {
575                         CERROR("%s: Can't write space data, "
576                                "rc %d\n", seq->lss_name, rc);
577                 }
578                 seq_store_trans_stop(seq, env, th);
579         } else if (rc) {
580                 CERROR("%s: Can't read space data, rc %d\n",
581                        seq->lss_name, rc);
582                 GOTO(out, rc);
583         }
584
585         if (is_srv) {
586                 LASSERT(range_is_sane(&seq->lss_space));
587         } else {
588                 LASSERT(!range_is_zero(&seq->lss_space) &&
589                         range_is_sane(&seq->lss_space));
590         }
591
592         rc  = seq_server_proc_init(seq);
593         if (rc)
594                 GOTO(out, rc);
595
596         EXIT;
597 out:
598         if (rc)
599                 seq_server_fini(seq, env);
600         return rc;
601 }
602 EXPORT_SYMBOL(seq_server_init);
603
604 void seq_server_fini(struct lu_server_seq *seq,
605                      const struct lu_env *env)
606 {
607         ENTRY;
608
609         seq_server_proc_fini(seq);
610         seq_store_fini(seq, env);
611
612         EXIT;
613 }
614 EXPORT_SYMBOL(seq_server_fini);
615
616 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
617
618 static struct lu_local_obj_desc llod_seq_srv = {
619         .llod_name      = LUSTRE_SEQ_SRV_NAME,
620         .llod_oid       = FID_SEQ_SRV_OID,
621         .llod_is_index  = 0,
622 };
623
624 static struct lu_local_obj_desc llod_seq_ctl = {
625         .llod_name      = LUSTRE_SEQ_CTL_NAME,
626         .llod_oid       = FID_SEQ_CTL_OID,
627         .llod_is_index  = 0,
628 };
629
630 static int __init fid_mod_init(void)
631 {
632         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
633                                              proc_lustre_root,
634                                              NULL, NULL);
635         if (IS_ERR(seq_type_proc_dir))
636                 return PTR_ERR(seq_type_proc_dir);
637
638         llo_local_obj_register(&llod_seq_srv);
639         llo_local_obj_register(&llod_seq_ctl);
640
641         LU_CONTEXT_KEY_INIT(&seq_thread_key);
642         lu_context_key_register(&seq_thread_key);
643         return 0;
644 }
645
646 static void __exit fid_mod_exit(void)
647 {
648         llo_local_obj_unregister(&llod_seq_srv);
649         llo_local_obj_unregister(&llod_seq_ctl);
650
651         lu_context_key_degister(&seq_thread_key);
652         if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
653                 lprocfs_remove(&seq_type_proc_dir);
654                 seq_type_proc_dir = NULL;
655         }
656 }
657
658 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
659 MODULE_DESCRIPTION("Lustre FID Module");
660 MODULE_LICENSE("GPL");
661
662 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
663 #endif