Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 #ifdef __KERNEL__
51 /* Assigns client to sequence controller node. */
52 int seq_server_set_cli(struct lu_server_seq *seq,
53                        struct lu_client_seq *cli,
54                        const struct lu_env *env)
55 {
56         int rc = 0;
57         ENTRY;
58
59         /*
60          * Ask client for new range, assign that range to ->seq_space and write
61          * seq state to backing store should be atomic.
62          */
63         down(&seq->lss_sem);
64
65         if (cli == NULL) {
66                 CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
67                        seq->lss_name, cli->lcs_name);
68                 seq->lss_cli = cli;
69                 GOTO(out_up, rc = 0);
70         }
71
72         if (seq->lss_cli != NULL) {
73                 CERROR("%s: Sequence controller is already "
74                        "assigned\n", seq->lss_name);
75                 GOTO(out_up, rc = -EINVAL);
76         }
77
78         CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
79                seq->lss_name, cli->lcs_name);
80
81         seq->lss_cli = cli;
82         EXIT;
83 out_up:
84         up(&seq->lss_sem);
85         return rc;
86 }
87 EXPORT_SYMBOL(seq_server_set_cli);
88
89 /*
90  * On controller node, allocate new super sequence for regular sequence server.
91  */
92 static int __seq_server_alloc_super(struct lu_server_seq *seq,
93                                     struct lu_range *in,
94                                     struct lu_range *out,
95                                     const struct lu_env *env)
96 {
97         struct lu_range *space = &seq->lss_space;
98         int rc;
99         ENTRY;
100
101         LASSERT(range_is_sane(space));
102
103         if (in != NULL) {
104                 CDEBUG(D_INFO, "%s: Input seq range: "
105                        DRANGE"\n", seq->lss_name, PRANGE(in));
106
107                 if (in->lr_end > space->lr_start)
108                         space->lr_start = in->lr_end;
109                 *out = *in;
110
111                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
112                        seq->lss_name, PRANGE(space));
113         } else {
114                 if (range_space(space) < seq->lss_width) {
115                         CWARN("%s: Sequences space to be exhausted soon. "
116                               "Only "LPU64" sequences left\n", seq->lss_name,
117                               range_space(space));
118                         *out = *space;
119                         space->lr_start = space->lr_end;
120                 } else if (range_is_exhausted(space)) {
121                         CERROR("%s: Sequences space is exhausted\n",
122                                seq->lss_name);
123                         RETURN(-ENOSPC);
124                 } else {
125                         range_alloc(out, space, seq->lss_width);
126                 }
127         }
128
129         rc = seq_store_write(seq, env);
130         if (rc) {
131                 CERROR("%s: Can't write space data, rc %d\n",
132                        seq->lss_name, rc);
133                 RETURN(rc);
134         }
135
136         CDEBUG(D_INFO, "%s: Allocated super-sequence "
137                DRANGE"\n", seq->lss_name, PRANGE(out));
138
139         RETURN(rc);
140 }
141
142 int seq_server_alloc_super(struct lu_server_seq *seq,
143                            struct lu_range *in,
144                            struct lu_range *out,
145                            const struct lu_env *env)
146 {
147         int rc;
148         ENTRY;
149
150         down(&seq->lss_sem);
151         rc = __seq_server_alloc_super(seq, in, out, env);
152         up(&seq->lss_sem);
153
154         RETURN(rc);
155 }
156
157 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
158                                    struct lu_range *in,
159                                    struct lu_range *out,
160                                    const struct lu_env *env)
161 {
162         struct lu_range *space = &seq->lss_space;
163         int rc = 0;
164         ENTRY;
165
166         LASSERT(range_is_sane(space));
167
168         /*
169          * This is recovery case. Adjust super range if input range looks like
170          * it is allocated from new super.
171          */
172         if (in != NULL) {
173                 CDEBUG(D_INFO, "%s: Input seq range: "
174                        DRANGE"\n", seq->lss_name, PRANGE(in));
175
176                 if (range_is_exhausted(space)) {
177                         /*
178                          * Server cannot send empty range to client, this is why
179                          * we check here that range from client is "newer" than
180                          * exhausted super.
181                          */
182                         LASSERT(in->lr_end > space->lr_start);
183
184                         /*
185                          * Start is set to end of last allocated, because it
186                          * *is* already allocated so we take that into account
187                          * and do not use for other allocations.
188                          */
189                         space->lr_start = in->lr_end;
190
191                         /*
192                          * End is set to in->lr_start + super sequence
193                          * allocation unit. That is because in->lr_start is
194                          * first seq in new allocated range from controller
195                          * before failure.
196                          */
197                         space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
198
199                         if (!seq->lss_cli) {
200                                 CERROR("%s: No sequence controller "
201                                        "is attached.\n", seq->lss_name);
202                                 RETURN(-ENODEV);
203                         }
204
205                         /*
206                          * Let controller know that this is recovery and last
207                          * obtained range from it was @space.
208                          */
209                         rc = seq_client_replay_super(seq->lss_cli, space, env);
210                         if (rc) {
211                                 CERROR("%s: Can't replay super-sequence, "
212                                        "rc %d\n", seq->lss_name, rc);
213                                 RETURN(rc);
214                         }
215                 } else {
216                         /*
217                          * Update super start by end from client's range. Super
218                          * end should not be changed if range was not exhausted.
219                          */
220                         if (in->lr_end > space->lr_start)
221                                 space->lr_start = in->lr_end;
222                 }
223
224                 *out = *in;
225
226                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
227                        seq->lss_name, PRANGE(space));
228         } else {
229                 /*
230                  * XXX: Avoid cascading RPCs using kind of async preallocation
231                  * when meta-sequence is close to exhausting.
232                  */
233                 if (range_is_exhausted(space)) {
234                         if (!seq->lss_cli) {
235                                 CERROR("%s: No sequence controller "
236                                        "is attached.\n", seq->lss_name);
237                                 RETURN(-ENODEV);
238                         }
239
240                         rc = seq_client_alloc_super(seq->lss_cli, env);
241                         if (rc) {
242                                 CERROR("%s: Can't allocate super-sequence, "
243                                        "rc %d\n", seq->lss_name, rc);
244                                 RETURN(rc);
245                         }
246
247                         /* Saving new range to allocation space. */
248                         *space = seq->lss_cli->lcs_space;
249                         LASSERT(range_is_sane(space));
250                 }
251
252                 range_alloc(out, space, seq->lss_width);
253         }
254
255         rc = seq_store_write(seq, env);
256         if (rc) {
257                 CERROR("%s: Can't write space data, rc %d\n",
258                        seq->lss_name, rc);
259         }
260
261         if (rc == 0) {
262                 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
263                        DRANGE"\n", seq->lss_name, PRANGE(out));
264         }
265
266         RETURN(rc);
267 }
268
269 int seq_server_alloc_meta(struct lu_server_seq *seq,
270                           struct lu_range *in,
271                           struct lu_range *out,
272                           const struct lu_env *env)
273 {
274         int rc;
275         ENTRY;
276
277         down(&seq->lss_sem);
278         rc = __seq_server_alloc_meta(seq, in, out, env);
279         up(&seq->lss_sem);
280
281         RETURN(rc);
282 }
283 EXPORT_SYMBOL(seq_server_alloc_meta);
284
285 static int seq_server_handle(struct lu_site *site,
286                              const struct lu_env *env,
287                              __u32 opc, struct lu_range *in,
288                              struct lu_range *out)
289 {
290         int rc;
291         ENTRY;
292
293         switch (opc) {
294         case SEQ_ALLOC_META:
295                 if (!site->ls_server_seq) {
296                         CERROR("Sequence server is not "
297                                "initialized\n");
298                         RETURN(-EINVAL);
299                 }
300                 rc = seq_server_alloc_meta(site->ls_server_seq,
301                                            in, out, env);
302                 break;
303         case SEQ_ALLOC_SUPER:
304                 if (!site->ls_control_seq) {
305                         CERROR("Sequence controller is not "
306                                "initialized\n");
307                         RETURN(-EINVAL);
308                 }
309                 rc = seq_server_alloc_super(site->ls_control_seq,
310                                             in, out, env);
311                 break;
312         default:
313                 rc = -EINVAL;
314                 break;
315         }
316
317         RETURN(rc);
318 }
319
320 static int seq_req_handle(struct ptlrpc_request *req,
321                           const struct lu_env *env,
322                           struct seq_thread_info *info)
323 {
324         struct lu_range *out, *in = NULL;
325         struct lu_site *site;
326         int rc = -EPROTO;
327         __u32 *opc;
328         ENTRY;
329
330         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
331         LASSERT(site != NULL);
332                         
333         rc = req_capsule_pack(&info->sti_pill);
334         if (rc)
335                 RETURN(err_serious(rc));
336
337         opc = req_capsule_client_get(&info->sti_pill,
338                                      &RMF_SEQ_OPC);
339         if (opc != NULL) {
340                 out = req_capsule_server_get(&info->sti_pill,
341                                              &RMF_SEQ_RANGE);
342                 if (out == NULL)
343                         RETURN(err_serious(-EPROTO));
344
345                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
346                         in = req_capsule_client_get(&info->sti_pill,
347                                                     &RMF_SEQ_RANGE);
348
349                         LASSERT(!range_is_zero(in) && range_is_sane(in));
350                 }
351
352                 rc = seq_server_handle(site, env, *opc, in, out);
353         } else
354                 rc = err_serious(-EPROTO);
355
356         RETURN(rc);
357 }
358
359 /* context key constructor/destructor: seq_key_init, seq_key_fini */
360 LU_KEY_INIT_FINI(seq, struct seq_thread_info);
361
362 /* context key: seq_thread_key */
363 LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD);
364
365 static void seq_thread_info_init(struct ptlrpc_request *req,
366                                  struct seq_thread_info *info)
367 {
368         int i;
369
370         /* Mark rep buffer as req-layout stuff expects */
371         for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
372                 info->sti_rep_buf_size[i] = -1;
373
374         /* Init request capsule */
375         req_capsule_init(&info->sti_pill, req, RCL_SERVER,
376                          info->sti_rep_buf_size);
377
378         req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
379 }
380
381 static void seq_thread_info_fini(struct seq_thread_info *info)
382 {
383         req_capsule_fini(&info->sti_pill);
384 }
385
386 static int seq_handle(struct ptlrpc_request *req)
387 {
388         const struct lu_env *env;
389         struct seq_thread_info *info;
390         int rc;
391
392         env = req->rq_svc_thread->t_env;
393         LASSERT(env != NULL);
394
395         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
396         LASSERT(info != NULL);
397
398         seq_thread_info_init(req, info);
399         rc = seq_req_handle(req, env, info);
400         seq_thread_info_fini(info);
401
402         return rc;
403 }
404
405 /*
406  * Entry point for handling FLD RPCs called from MDT.
407  */
408 int seq_query(struct com_thread_info *info)
409 {
410         return seq_handle(info->cti_pill.rc_req);
411 }
412 EXPORT_SYMBOL(seq_query);
413
414 static void seq_server_proc_fini(struct lu_server_seq *seq);
415
416 #ifdef LPROCFS
417 static int seq_server_proc_init(struct lu_server_seq *seq)
418 {
419         int rc;
420         ENTRY;
421
422         seq->lss_proc_dir = lprocfs_register(seq->lss_name,
423                                              seq_type_proc_dir,
424                                              NULL, NULL);
425         if (IS_ERR(seq->lss_proc_dir)) {
426                 rc = PTR_ERR(seq->lss_proc_dir);
427                 RETURN(rc);
428         }
429
430         rc = lprocfs_add_vars(seq->lss_proc_dir,
431                               seq_server_proc_list, seq);
432         if (rc) {
433                 CERROR("%s: Can't init sequence manager "
434                        "proc, rc %d\n", seq->lss_name, rc);
435                 GOTO(out_cleanup, rc);
436         }
437
438         RETURN(0);
439
440 out_cleanup:
441         seq_server_proc_fini(seq);
442         return rc;
443 }
444
445 static void seq_server_proc_fini(struct lu_server_seq *seq)
446 {
447         ENTRY;
448         if (seq->lss_proc_dir != NULL) {
449                 if (!IS_ERR(seq->lss_proc_dir))
450                         lprocfs_remove(&seq->lss_proc_dir);
451                 seq->lss_proc_dir = NULL;
452         }
453         EXIT;
454 }
455 #else
456 static int seq_server_proc_init(struct lu_server_seq *seq)
457 {
458         return 0;
459 }
460
461 static void seq_server_proc_fini(struct lu_server_seq *seq)
462 {
463         return;
464 }
465 #endif
466
467 int seq_server_init(struct lu_server_seq *seq,
468                     struct dt_device *dev,
469                     const char *prefix,
470                     enum lu_mgr_type type,
471                     const struct lu_env *env)
472 {
473         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
474         ENTRY;
475
476         LASSERT(dev != NULL);
477         LASSERT(prefix != NULL);
478
479         seq->lss_cli = NULL;
480         seq->lss_type = type;
481         range_zero(&seq->lss_space);
482         sema_init(&seq->lss_sem, 1);
483
484         seq->lss_width = is_srv ?
485                 LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
486
487         snprintf(seq->lss_name, sizeof(seq->lss_name),
488                  "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
489
490         rc = seq_store_init(seq, env, dev);
491         if (rc)
492                 GOTO(out, rc);
493
494         /* Request backing store for saved sequence info. */
495         rc = seq_store_read(seq, env);
496         if (rc == -ENODATA) {
497
498                 /* Nothing is read, init by default value. */
499                 seq->lss_space = is_srv ?
500                         LUSTRE_SEQ_ZERO_RANGE:
501                         LUSTRE_SEQ_SPACE_RANGE;
502
503                 CDEBUG(D_INFO, "%s: No data found "
504                        "on store. Initialize space\n",
505                        seq->lss_name);
506
507                 /* Save default controller value to store. */
508                 rc = seq_store_write(seq, env);
509                 if (rc) {
510                         CERROR("%s: Can't write space data, "
511                                "rc %d\n", seq->lss_name, rc);
512                 }
513         } else if (rc) {
514                 CERROR("%s: Can't read space data, rc %d\n",
515                        seq->lss_name, rc);
516                 GOTO(out, rc);
517         }
518
519         if (is_srv) {
520                 LASSERT(range_is_sane(&seq->lss_space));
521         } else {
522                 LASSERT(!range_is_zero(&seq->lss_space) &&
523                         range_is_sane(&seq->lss_space));
524         }
525
526         rc  = seq_server_proc_init(seq);
527         if (rc)
528                 GOTO(out, rc);
529
530         EXIT;
531 out:
532         if (rc)
533                 seq_server_fini(seq, env);
534         return rc;
535 }
536 EXPORT_SYMBOL(seq_server_init);
537
538 void seq_server_fini(struct lu_server_seq *seq,
539                      const struct lu_env *env)
540 {
541         ENTRY;
542
543         seq_server_proc_fini(seq);
544         seq_store_fini(seq, env);
545
546         EXIT;
547 }
548 EXPORT_SYMBOL(seq_server_fini);
549
550 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
551
552 static int __init fid_mod_init(void)
553 {
554         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
555                                              proc_lustre_root,
556                                              NULL, NULL);
557         if (IS_ERR(seq_type_proc_dir))
558                 return PTR_ERR(seq_type_proc_dir);
559
560         LU_CONTEXT_KEY_INIT(&seq_thread_key);
561         lu_context_key_register(&seq_thread_key);
562         return 0;
563 }
564
565 static void __exit fid_mod_exit(void)
566 {
567         lu_context_key_degister(&seq_thread_key);
568         if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
569                 lprocfs_remove(&seq_type_proc_dir);
570                 seq_type_proc_dir = NULL;
571         }
572 }
573
574 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
575 MODULE_DESCRIPTION("Lustre FID Module");
576 MODULE_LICENSE("GPL");
577
578 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
579 #endif