Whamcloud - gitweb
LU-1214 ptlrpc: removes client lu_target.h/target.c dependency
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/fid/fid_handler.c
39  *
40  * Lustre Sequence Manager
41  *
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_FID
49
50 #ifdef __KERNEL__
51 # include <libcfs/libcfs.h>
52 # include <linux/module.h>
53 #else /* __KERNEL__ */
54 # include <liblustre.h>
55 #endif
56
57 #include <obd.h>
58 #include <obd_class.h>
59 #include <dt_object.h>
60 #include <md_object.h>
61 #include <obd_support.h>
62 #include <lustre_req_layout.h>
63 #include <lustre_fid.h>
64 #include "fid_internal.h"
65
66 #ifdef __KERNEL__
67 /* Assigns client to sequence controller node. */
68 int seq_server_set_cli(struct lu_server_seq *seq,
69                        struct lu_client_seq *cli,
70                        const struct lu_env *env)
71 {
72         int rc = 0;
73         ENTRY;
74
75         /*
76          * Ask client for new range, assign that range to ->seq_space and write
77          * seq state to backing store should be atomic.
78          */
79         cfs_mutex_lock(&seq->lss_mutex);
80
81         if (cli == NULL) {
82                 CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
83                        seq->lss_name, cli->lcs_name);
84                 seq->lss_cli = cli;
85                 GOTO(out_up, rc = 0);
86         }
87
88         if (seq->lss_cli != NULL) {
89                 CERROR("%s: Sequence controller is already "
90                        "assigned\n", seq->lss_name);
91                 GOTO(out_up, rc = -EINVAL);
92         }
93
94         CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
95                seq->lss_name, cli->lcs_name);
96
97         seq->lss_cli = cli;
98         cli->lcs_space.lsr_index = seq->lss_site->ms_node_id;
99         EXIT;
100 out_up:
101         cfs_mutex_unlock(&seq->lss_mutex);
102         return rc;
103 }
104 EXPORT_SYMBOL(seq_server_set_cli);
105 /*
106  * allocate \a w units of sequence from range \a from.
107  */
108 static inline void range_alloc(struct lu_seq_range *to,
109                                struct lu_seq_range *from,
110                                __u64 width)
111 {
112         width = min(range_space(from), width);
113         to->lsr_start = from->lsr_start;
114         to->lsr_end = from->lsr_start + width;
115         from->lsr_start += width;
116 }
117
118 /**
119  * On controller node, allocate new super sequence for regular sequence server.
120  * As this super sequence controller, this node suppose to maintain fld
121  * and update index.
122  * \a out range always has currect mds node number of requester.
123  */
124
125 static int __seq_server_alloc_super(struct lu_server_seq *seq,
126                                     struct lu_seq_range *out,
127                                     const struct lu_env *env)
128 {
129         struct lu_seq_range *space = &seq->lss_space;
130         int rc;
131         ENTRY;
132
133         LASSERT(range_is_sane(space));
134
135         if (range_is_exhausted(space)) {
136                 CERROR("%s: Sequences space is exhausted\n",
137                        seq->lss_name);
138                 RETURN(-ENOSPC);
139         } else {
140                 range_alloc(out, space, seq->lss_width);
141         }
142
143         rc = seq_store_update(env, seq, out, 1 /* sync */);
144
145         CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
146                DRANGE"\n", seq->lss_name, rc, PRANGE(out));
147
148         RETURN(rc);
149 }
150
151 int seq_server_alloc_super(struct lu_server_seq *seq,
152                            struct lu_seq_range *out,
153                            const struct lu_env *env)
154 {
155         int rc;
156         ENTRY;
157
158         cfs_mutex_lock(&seq->lss_mutex);
159         rc = __seq_server_alloc_super(seq, out, env);
160         cfs_mutex_unlock(&seq->lss_mutex);
161
162         RETURN(rc);
163 }
164
165 static int __seq_set_init(const struct lu_env *env,
166                             struct lu_server_seq *seq)
167 {
168         struct lu_seq_range *space = &seq->lss_space;
169         int rc;
170
171         range_alloc(&seq->lss_lowater_set, space, seq->lss_set_width);
172         range_alloc(&seq->lss_hiwater_set, space, seq->lss_set_width);
173
174         rc = seq_store_update(env, seq, NULL, 1);
175
176         return rc;
177 }
178
179 /*
180  * This function implements new seq allocation algorithm using async
181  * updates to seq file on disk. ref bug 18857 for details.
182  * there are four variable to keep track of this process
183  *
184  * lss_space; - available lss_space
185  * lss_lowater_set; - lu_seq_range for all seqs before barrier, i.e. safe to use
186  * lss_hiwater_set; - lu_seq_range after barrier, i.e. allocated but may be
187  *                    not yet committed
188  *
189  * when lss_lowater_set reaches the end it is replaced with hiwater one and
190  * a write operation is initiated to allocate new hiwater range.
191  * if last seq write opearion is still not commited, current operation is
192  * flaged as sync write op.
193  */
194 static int range_alloc_set(const struct lu_env *env,
195                             struct lu_seq_range *out,
196                             struct lu_server_seq *seq)
197 {
198         struct lu_seq_range *space = &seq->lss_space;
199         struct lu_seq_range *loset = &seq->lss_lowater_set;
200         struct lu_seq_range *hiset = &seq->lss_hiwater_set;
201         int rc = 0;
202
203         if (range_is_zero(loset))
204                 __seq_set_init(env, seq);
205
206         if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_ALLOC)) /* exhaust set */
207                 loset->lsr_start = loset->lsr_end;
208
209         if (range_is_exhausted(loset)) {
210                 /* reached high water mark. */
211                 struct lu_device *dev = seq->lss_site->ms_lu.ls_top_dev;
212                 int obd_num_clients = dev->ld_obd->obd_num_exports;
213                 __u64 set_sz;
214
215                 /* calculate new seq width based on number of clients */
216                 set_sz = max(seq->lss_set_width,
217                              obd_num_clients * seq->lss_width);
218                 set_sz = min(range_space(space), set_sz);
219
220                 /* Switch to hiwater range now */
221                 *loset = *hiset;
222                 /* allocate new hiwater range */
223                 range_alloc(hiset, space, set_sz);
224
225                 /* update ondisk seq with new *space */
226                 rc = seq_store_update(env, seq, NULL, seq->lss_need_sync);
227         }
228
229         LASSERTF(!range_is_exhausted(loset) || range_is_sane(loset),
230                  DRANGE"\n", PRANGE(loset));
231
232         if (rc == 0)
233                 range_alloc(out, loset, seq->lss_width);
234
235         RETURN(rc);
236 }
237
238 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
239                                    struct lu_seq_range *out,
240                                    const struct lu_env *env)
241 {
242         struct lu_seq_range *space = &seq->lss_space;
243         int rc = 0;
244
245         ENTRY;
246
247         LASSERT(range_is_sane(space));
248
249         /* Check if available space ends and allocate new super seq */
250         if (range_is_exhausted(space)) {
251                 if (!seq->lss_cli) {
252                         CERROR("%s: No sequence controller is attached.\n",
253                                seq->lss_name);
254                         RETURN(-ENODEV);
255                 }
256
257                 rc = seq_client_alloc_super(seq->lss_cli, env);
258                 if (rc) {
259                         CERROR("%s: Can't allocate super-sequence, rc %d\n",
260                                seq->lss_name, rc);
261                         RETURN(rc);
262                 }
263
264                 /* Saving new range to allocation space. */
265                 *space = seq->lss_cli->lcs_space;
266                 LASSERT(range_is_sane(space));
267         }
268
269         rc = range_alloc_set(env, out, seq);
270         if (rc == 0) {
271                 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
272                        DRANGE"\n", seq->lss_name, PRANGE(out));
273         }
274
275         RETURN(rc);
276 }
277
278 int seq_server_alloc_meta(struct lu_server_seq *seq,
279                           struct lu_seq_range *out,
280                           const struct lu_env *env)
281 {
282         int rc;
283         ENTRY;
284
285         cfs_mutex_lock(&seq->lss_mutex);
286         rc = __seq_server_alloc_meta(seq, out, env);
287         cfs_mutex_unlock(&seq->lss_mutex);
288
289         RETURN(rc);
290 }
291 EXPORT_SYMBOL(seq_server_alloc_meta);
292
293 static int seq_server_handle(struct lu_site *site,
294                              const struct lu_env *env,
295                              __u32 opc, struct lu_seq_range *out)
296 {
297         int rc;
298         struct md_site *mite;
299         ENTRY;
300
301         mite = lu_site2md(site);
302         switch (opc) {
303         case SEQ_ALLOC_META:
304                 if (!mite->ms_server_seq) {
305                         CERROR("Sequence server is not "
306                                "initialized\n");
307                         RETURN(-EINVAL);
308                 }
309                 rc = seq_server_alloc_meta(mite->ms_server_seq, out, env);
310                 break;
311         case SEQ_ALLOC_SUPER:
312                 if (!mite->ms_control_seq) {
313                         CERROR("Sequence controller is not "
314                                "initialized\n");
315                         RETURN(-EINVAL);
316                 }
317                 rc = seq_server_alloc_super(mite->ms_control_seq, out, env);
318                 break;
319         default:
320                 rc = -EINVAL;
321                 break;
322         }
323
324         RETURN(rc);
325 }
326
327 static int seq_req_handle(struct ptlrpc_request *req,
328                           const struct lu_env *env,
329                           struct seq_thread_info *info)
330 {
331         struct lu_seq_range *out, *tmp;
332         struct lu_site *site;
333         int rc = -EPROTO;
334         __u32 *opc;
335         ENTRY;
336
337         LASSERT(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY));
338         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
339         LASSERT(site != NULL);
340
341         rc = req_capsule_server_pack(info->sti_pill);
342         if (rc)
343                 RETURN(err_serious(rc));
344
345         opc = req_capsule_client_get(info->sti_pill, &RMF_SEQ_OPC);
346         if (opc != NULL) {
347                 out = req_capsule_server_get(info->sti_pill, &RMF_SEQ_RANGE);
348                 if (out == NULL)
349                         RETURN(err_serious(-EPROTO));
350
351                 tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
352
353                 /* seq client passed mdt id, we need to pass that using out
354                  * range parameter */
355
356                 out->lsr_index = tmp->lsr_index;
357                 out->lsr_flags = tmp->lsr_flags;
358                 rc = seq_server_handle(site, env, *opc, out);
359         } else
360                 rc = err_serious(-EPROTO);
361
362         RETURN(rc);
363 }
364
365 /* context key constructor/destructor: seq_key_init, seq_key_fini */
366 LU_KEY_INIT_FINI(seq, struct seq_thread_info);
367
368 /* context key: seq_thread_key */
369 LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD);
370
371 static void seq_thread_info_init(struct ptlrpc_request *req,
372                                  struct seq_thread_info *info)
373 {
374         info->sti_pill = &req->rq_pill;
375         /* Init request capsule */
376         req_capsule_init(info->sti_pill, req, RCL_SERVER);
377         req_capsule_set(info->sti_pill, &RQF_SEQ_QUERY);
378 }
379
380 static void seq_thread_info_fini(struct seq_thread_info *info)
381 {
382         req_capsule_fini(info->sti_pill);
383 }
384
385 static int seq_handle(struct ptlrpc_request *req)
386 {
387         const struct lu_env *env;
388         struct seq_thread_info *info;
389         int rc;
390
391         env = req->rq_svc_thread->t_env;
392         LASSERT(env != NULL);
393
394         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
395         LASSERT(info != NULL);
396
397         seq_thread_info_init(req, info);
398         rc = seq_req_handle(req, env, info);
399         /* XXX: we don't need replay but MDT assign transno in any case,
400          * remove it manually before reply*/
401         lustre_msg_set_transno(req->rq_repmsg, 0);
402         seq_thread_info_fini(info);
403
404         return rc;
405 }
406
407 /*
408  * Entry point for handling FLD RPCs called from MDT.
409  */
410 int seq_query(struct com_thread_info *info)
411 {
412         return seq_handle(info->cti_pill->rc_req);
413 }
414 EXPORT_SYMBOL(seq_query);
415
416 static void seq_server_proc_fini(struct lu_server_seq *seq);
417
418 #ifdef LPROCFS
419 static int seq_server_proc_init(struct lu_server_seq *seq)
420 {
421         int rc;
422         ENTRY;
423
424         seq->lss_proc_dir = lprocfs_register(seq->lss_name,
425                                              seq_type_proc_dir,
426                                              NULL, NULL);
427         if (IS_ERR(seq->lss_proc_dir)) {
428                 rc = PTR_ERR(seq->lss_proc_dir);
429                 RETURN(rc);
430         }
431
432         rc = lprocfs_add_vars(seq->lss_proc_dir,
433                               seq_server_proc_list, seq);
434         if (rc) {
435                 CERROR("%s: Can't init sequence manager "
436                        "proc, rc %d\n", seq->lss_name, rc);
437                 GOTO(out_cleanup, rc);
438         }
439
440         RETURN(0);
441
442 out_cleanup:
443         seq_server_proc_fini(seq);
444         return rc;
445 }
446
447 static void seq_server_proc_fini(struct lu_server_seq *seq)
448 {
449         ENTRY;
450         if (seq->lss_proc_dir != NULL) {
451                 if (!IS_ERR(seq->lss_proc_dir))
452                         lprocfs_remove(&seq->lss_proc_dir);
453                 seq->lss_proc_dir = NULL;
454         }
455         EXIT;
456 }
457 #else
458 static int seq_server_proc_init(struct lu_server_seq *seq)
459 {
460         return 0;
461 }
462
463 static void seq_server_proc_fini(struct lu_server_seq *seq)
464 {
465         return;
466 }
467 #endif
468
469
470 int seq_server_init(struct lu_server_seq *seq,
471                     struct dt_device *dev,
472                     const char *prefix,
473                     enum lu_mgr_type type,
474                     struct md_site *ms,
475                     const struct lu_env *env)
476 {
477         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
478         ENTRY;
479
480         LASSERT(dev != NULL);
481         LASSERT(prefix != NULL);
482
483         seq->lss_cli = NULL;
484         seq->lss_type = type;
485         seq->lss_site = ms;
486         range_init(&seq->lss_space);
487
488         range_init(&seq->lss_lowater_set);
489         range_init(&seq->lss_hiwater_set);
490         seq->lss_set_width = LUSTRE_SEQ_BATCH_WIDTH;
491
492         cfs_mutex_init(&seq->lss_mutex);
493
494         seq->lss_width = is_srv ?
495                 LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
496
497         snprintf(seq->lss_name, sizeof(seq->lss_name),
498                  "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
499
500         rc = seq_store_init(seq, env, dev);
501         if (rc)
502                 GOTO(out, rc);
503         /* Request backing store for saved sequence info. */
504         rc = seq_store_read(seq, env);
505         if (rc == -ENODATA) {
506
507                 /* Nothing is read, init by default value. */
508                 seq->lss_space = is_srv ?
509                         LUSTRE_SEQ_ZERO_RANGE:
510                         LUSTRE_SEQ_SPACE_RANGE;
511
512                 seq->lss_space.lsr_index = ms->ms_node_id;
513                 CDEBUG(D_INFO, "%s: No data found "
514                        "on store. Initialize space\n",
515                        seq->lss_name);
516
517                 rc = seq_store_update(env, seq, NULL, 0);
518                 if (rc) {
519                         CERROR("%s: Can't write space data, "
520                                "rc %d\n", seq->lss_name, rc);
521                 }
522         } else if (rc) {
523                 CERROR("%s: Can't read space data, rc %d\n",
524                        seq->lss_name, rc);
525                 GOTO(out, rc);
526         }
527
528         if (is_srv) {
529                 LASSERT(range_is_sane(&seq->lss_space));
530         } else {
531                 LASSERT(!range_is_zero(&seq->lss_space) &&
532                         range_is_sane(&seq->lss_space));
533         }
534
535         rc  = seq_server_proc_init(seq);
536         if (rc)
537                 GOTO(out, rc);
538
539         EXIT;
540 out:
541         if (rc)
542                 seq_server_fini(seq, env);
543         return rc;
544 }
545 EXPORT_SYMBOL(seq_server_init);
546
547 void seq_server_fini(struct lu_server_seq *seq,
548                      const struct lu_env *env)
549 {
550         ENTRY;
551
552         seq_server_proc_fini(seq);
553         seq_store_fini(seq, env);
554
555         EXIT;
556 }
557 EXPORT_SYMBOL(seq_server_fini);
558
559 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
560
561 static struct lu_local_obj_desc llod_seq_srv = {
562         .llod_name      = LUSTRE_SEQ_SRV_NAME,
563         .llod_oid       = FID_SEQ_SRV_OID,
564         .llod_is_index  = 0,
565 };
566
567 static struct lu_local_obj_desc llod_seq_ctl = {
568         .llod_name      = LUSTRE_SEQ_CTL_NAME,
569         .llod_oid       = FID_SEQ_CTL_OID,
570         .llod_is_index  = 0,
571 };
572
573 static int __init fid_mod_init(void)
574 {
575         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
576                                              proc_lustre_root,
577                                              NULL, NULL);
578         if (IS_ERR(seq_type_proc_dir))
579                 return PTR_ERR(seq_type_proc_dir);
580
581         llo_local_obj_register(&llod_seq_srv);
582         llo_local_obj_register(&llod_seq_ctl);
583
584         LU_CONTEXT_KEY_INIT(&seq_thread_key);
585         lu_context_key_register(&seq_thread_key);
586         return 0;
587 }
588
589 static void __exit fid_mod_exit(void)
590 {
591         llo_local_obj_unregister(&llod_seq_srv);
592         llo_local_obj_unregister(&llod_seq_ctl);
593
594         lu_context_key_degister(&seq_thread_key);
595         if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
596                 lprocfs_remove(&seq_type_proc_dir);
597                 seq_type_proc_dir = NULL;
598         }
599 }
600
601 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
602 MODULE_DESCRIPTION("Lustre FID Module");
603 MODULE_LICENSE("GPL");
604
605 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
606 #endif