Whamcloud - gitweb
LU-909 osd: changes to osd api
[fs/lustre-release.git] / lustre / fld / fld_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  * Copyright (c) 2011 Whamcloud, Inc.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * lustre/fld/fld_handler.c
38  *
39  * FLD (Fids Location Database)
40  *
41  * Author: Yury Umanets <umka@clusterfs.com>
42  * Author: WangDi <wangdi@clusterfs.com>
43  * Author: Pravin Shelar <pravin.shelar@sun.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_FLD
50
51 #ifdef __KERNEL__
52 # include <libcfs/libcfs.h>
53 # include <linux/module.h>
54 # include <linux/jbd.h>
55 # include <asm/div64.h>
56 #else /* __KERNEL__ */
57 # include <liblustre.h>
58 # include <libcfs/list.h>
59 #endif
60
61 #include <obd.h>
62 #include <obd_class.h>
63 #include <lustre_ver.h>
64 #include <obd_support.h>
65 #include <lprocfs_status.h>
66
67 #include <md_object.h>
68 #include <lustre_fid.h>
69 #include <lustre_req_layout.h>
70 #include "fld_internal.h"
71 #include <lustre_fid.h>
72
73 #ifdef __KERNEL__
74
75 /* context key constructor/destructor: fld_key_init, fld_key_fini */
76 LU_KEY_INIT_FINI(fld, struct fld_thread_info);
77
78 /* context key: fld_thread_key */
79 LU_CONTEXT_KEY_DEFINE(fld, LCT_MD_THREAD|LCT_DT_THREAD);
80
81 cfs_proc_dir_entry_t *fld_type_proc_dir = NULL;
82
83 static struct lu_local_obj_desc llod_fld_index = {
84         .llod_name      = fld_index_name,
85         .llod_oid       = FLD_INDEX_OID,
86         .llod_is_index  = 1,
87         .llod_feat      = &fld_index_features,
88 };
89
90 static int __init fld_mod_init(void)
91 {
92         fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME,
93                                              proc_lustre_root,
94                                              NULL, NULL);
95         if (IS_ERR(fld_type_proc_dir))
96                 return PTR_ERR(fld_type_proc_dir);
97
98         llo_local_obj_register(&llod_fld_index);
99
100         LU_CONTEXT_KEY_INIT(&fld_thread_key);
101         lu_context_key_register(&fld_thread_key);
102         return 0;
103 }
104
105 static void __exit fld_mod_exit(void)
106 {
107         llo_local_obj_unregister(&llod_fld_index);
108         lu_context_key_degister(&fld_thread_key);
109         if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) {
110                 lprocfs_remove(&fld_type_proc_dir);
111                 fld_type_proc_dir = NULL;
112         }
113 }
114
115 int fld_declare_server_create(struct lu_server_fld *fld,
116                               const struct lu_env *env,
117                               struct thandle *th)
118 {
119         struct dt_object *dt_obj = fld->lsf_obj;
120         int rc;
121
122         ENTRY;
123
124         /* for ldiskfs OSD it's enough to declare operation with any ops
125          * with DMU we'll probably need to specify exact key/value */
126         rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
127         if (rc)
128                 GOTO(out, rc);
129         rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
130         if (rc)
131                 GOTO(out, rc);
132         rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
133                                                       NULL, NULL, th);
134 out:
135         RETURN(rc);
136 }
137 EXPORT_SYMBOL(fld_declare_server_create);
138
139 /**
140  * Insert FLD index entry and update FLD cache.
141  *
142  * First it try to merge given range with existing range then update
143  * FLD index and FLD cache accordingly. FLD index consistency is maintained
144  * by this function.
145  * This function is called from the sequence allocator when a super-sequence
146  * is granted to a server.
147  */
148
149 int fld_server_create(struct lu_server_fld *fld,
150                       const struct lu_env *env,
151                       struct lu_seq_range *add_range,
152                       struct thandle *th)
153 {
154         struct lu_seq_range *erange;
155         struct lu_seq_range *new;
156         struct fld_thread_info *info;
157         int rc = 0;
158         int do_merge=0;
159
160         ENTRY;
161
162         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
163         cfs_mutex_lock(&fld->lsf_lock);
164
165         erange = &info->fti_lrange;
166         new = &info->fti_irange;
167         *new = *add_range;
168
169         /* STEP 1: try to merge with previous range */
170         rc = fld_index_lookup(fld, env, new->lsr_start, erange);
171         if (rc == 0) {
172                 /* in case of range overlap, the location must be same */
173                 if (range_compare_loc(new, erange) != 0) {
174                         CERROR("the start of given range "DRANGE" conflict to"
175                                "an existing range "DRANGE"\n",
176                                PRANGE(new), PRANGE(erange));
177                         GOTO(out, rc = -EIO);
178                 }
179
180                 if (new->lsr_end < erange->lsr_end)
181                         GOTO(out, rc);
182                 do_merge = 1;
183         } else if (rc == -ENOENT) {
184                 /* check for merge case: optimizes for single mds lustre.
185                  * As entry does not exist, returned entry must be left side
186                  * entry compared to start of new range (ref dio_lookup()).
187                  * So try to merge from left.
188                  */
189                 if (new->lsr_start == erange->lsr_end &&
190                     range_compare_loc(new, erange) == 0)
191                         do_merge = 1;
192         } else {
193                 /* no overlap allowed in fld, so failure in lookup is error */
194                 GOTO(out, rc);
195         }
196
197         if (do_merge) {
198                 /* new range will be merged with the existing one.
199                  * delete this range at first. */
200                 rc = fld_index_delete(fld, env, erange, th);
201                 if (rc != 0)
202                         GOTO(out, rc);
203
204                 new->lsr_start = min(erange->lsr_start, new->lsr_start);
205                 new->lsr_end = max(erange->lsr_end, new->lsr_end);
206                 do_merge = 0;
207         }
208
209         /* STEP 2: try to merge with next range */
210         rc = fld_index_lookup(fld, env, new->lsr_end, erange);
211         if (rc == 0) {
212                 /* found a matched range, meaning we're either
213                  * overlapping or ajacent, must merge with it. */
214                 do_merge = 1;
215         } else if (rc == -ENOENT) {
216                 /* this range is left of new range end point */
217                 LASSERT(erange->lsr_end <= new->lsr_end);
218                 /*
219                  * the found left range must be either:
220                  *  1. withing new range.
221                  *  2. left of new range (no overlapping).
222                  * because if they're partly overlapping, the STEP 1 must have
223                  * been removed this range.
224                  */
225                 LASSERTF(erange->lsr_start > new->lsr_start ||
226                          erange->lsr_end < new->lsr_start ||
227                          (erange->lsr_end == new->lsr_start &&
228                           range_compare_loc(new, erange) != 0),
229                          "left "DRANGE", new "DRANGE"\n",
230                          PRANGE(erange), PRANGE(new));
231
232                 /* if it's within the new range, merge it */
233                 if (erange->lsr_start > new->lsr_start)
234                         do_merge = 1;
235         } else {
236                GOTO(out, rc);
237         }
238
239         if (do_merge) {
240                 if (range_compare_loc(new, erange) != 0) {
241                         CERROR("the end of given range "DRANGE" overlaps "
242                                "with an existing range "DRANGE"\n",
243                                PRANGE(new), PRANGE(erange));
244                         GOTO(out, rc = -EIO);
245                 }
246
247                 /* merge with next range */
248                 rc = fld_index_delete(fld, env, erange, th);
249                 if (rc != 0)
250                         GOTO(out, rc);
251
252                 new->lsr_start = min(erange->lsr_start, new->lsr_start);
253                 new->lsr_end = max(erange->lsr_end, new->lsr_end);
254         }
255
256         /* now update fld entry. */
257         rc = fld_index_create(fld, env, new, th);
258
259         LASSERT(rc != -EEXIST);
260 out:
261         if (rc == 0)
262                 fld_cache_insert(fld->lsf_cache, new);
263
264         cfs_mutex_unlock(&fld->lsf_lock);
265
266         CDEBUG((rc != 0 ? D_ERROR : D_INFO),
267                "%s: FLD create: given range : "DRANGE
268                "after merge "DRANGE" rc = %d \n", fld->lsf_name,
269                 PRANGE(add_range), PRANGE(new), rc);
270
271         RETURN(rc);
272 }
273 EXPORT_SYMBOL(fld_server_create);
274
275 /**
276  *  Lookup mds by seq, returns a range for given seq.
277  *
278  *  If that entry is not cached in fld cache, request is sent to super
279  *  sequence controller node (MDT0). All other MDT[1...N] and client
280  *  cache fld entries, but this cache is not persistent.
281  */
282
283 int fld_server_lookup(struct lu_server_fld *fld,
284                       const struct lu_env *env,
285                       seqno_t seq, struct lu_seq_range *range)
286 {
287         struct lu_seq_range *erange;
288         struct fld_thread_info *info;
289         int rc;
290         ENTRY;
291
292         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
293         erange = &info->fti_lrange;
294
295         /* Lookup it in the cache. */
296         rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
297         if (rc == 0) {
298                 if (unlikely(erange->lsr_flags != range->lsr_flags)) {
299                         CERROR("FLD cache found a range "DRANGE" doesn't "
300                                "match the requested flag %x\n",
301                                PRANGE(erange), range->lsr_flags);
302                         RETURN(-EIO);
303                 }
304                 *range = *erange;
305                 RETURN(0);
306         }
307
308         if (fld->lsf_obj) {
309                 rc = fld_index_lookup(fld, env, seq, erange);
310                 if (rc == 0) {
311                         if (unlikely(erange->lsr_flags != range->lsr_flags)) {
312                                 CERROR("FLD found a range "DRANGE" doesn't "
313                                        "match the requested flag %x\n",
314                                        PRANGE(erange), range->lsr_flags);
315                                 RETURN(-EIO);
316                         }
317                         *range = *erange;
318                 }
319         } else {
320                 LASSERT(fld->lsf_control_exp);
321                 /* send request to mdt0 i.e. super seq. controller.
322                  * This is temporary solution, long term solution is fld
323                  * replication on all mdt servers.
324                  */
325                 rc = fld_client_rpc(fld->lsf_control_exp,
326                                     range, FLD_LOOKUP);
327         }
328
329         if (rc == 0)
330                 fld_cache_insert(fld->lsf_cache, range);
331
332         RETURN(rc);
333 }
334 EXPORT_SYMBOL(fld_server_lookup);
335
336 /**
337  * All MDT server handle fld lookup operation. But only MDT0 has fld index.
338  * if entry is not found in cache we need to forward lookup request to MDT0
339  */
340
341 static int fld_server_handle(struct lu_server_fld *fld,
342                              const struct lu_env *env,
343                              __u32 opc, struct lu_seq_range *range,
344                              struct fld_thread_info *info)
345 {
346         int rc;
347         ENTRY;
348
349         switch (opc) {
350         case FLD_LOOKUP:
351                 rc = fld_server_lookup(fld, env,
352                                        range->lsr_start, range);
353                 break;
354         default:
355                 rc = -EINVAL;
356                 break;
357         }
358
359         CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
360                DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
361
362         RETURN(rc);
363
364 }
365
366 static int fld_req_handle(struct ptlrpc_request *req,
367                           struct fld_thread_info *info)
368 {
369         struct obd_export *exp = req->rq_export;
370         struct lu_site *site = exp->exp_obd->obd_lu_dev->ld_site;
371         struct lu_seq_range *in;
372         struct lu_seq_range *out;
373         int rc;
374         __u32 *opc;
375         ENTRY;
376
377         rc = req_capsule_server_pack(info->fti_pill);
378         if (rc)
379                 RETURN(err_serious(rc));
380
381         opc = req_capsule_client_get(info->fti_pill, &RMF_FLD_OPC);
382         if (opc != NULL) {
383                 in = req_capsule_client_get(info->fti_pill, &RMF_FLD_MDFLD);
384                 if (in == NULL)
385                         RETURN(err_serious(-EPROTO));
386                 out = req_capsule_server_get(info->fti_pill, &RMF_FLD_MDFLD);
387                 if (out == NULL)
388                         RETURN(err_serious(-EPROTO));
389                 *out = *in;
390
391                 /* For old 2.0 client, the 'lsr_flags' is uninitialized.
392                  * Set it as 'LU_SEQ_RANGE_MDT' by default.
393                  * Old 2.0 liblustre client cannot talk with new 2.1 server. */
394                 if (!(exp->exp_connect_flags & OBD_CONNECT_64BITHASH) &&
395                     !exp->exp_libclient)
396                         out->lsr_flags = LU_SEQ_RANGE_MDT;
397
398                 rc = fld_server_handle(lu_site2md(site)->ms_server_fld,
399                                        req->rq_svc_thread->t_env,
400                                        *opc, out, info);
401         } else
402                 rc = err_serious(-EPROTO);
403
404         RETURN(rc);
405 }
406
407 static void fld_thread_info_init(struct ptlrpc_request *req,
408                                  struct fld_thread_info *info)
409 {
410         info->fti_pill = &req->rq_pill;
411         /* Init request capsule. */
412         req_capsule_init(info->fti_pill, req, RCL_SERVER);
413         req_capsule_set(info->fti_pill, &RQF_FLD_QUERY);
414 }
415
416 static void fld_thread_info_fini(struct fld_thread_info *info)
417 {
418         req_capsule_fini(info->fti_pill);
419 }
420
421 static int fld_handle(struct ptlrpc_request *req)
422 {
423         struct fld_thread_info *info;
424         const struct lu_env *env;
425         int rc;
426
427         env = req->rq_svc_thread->t_env;
428         LASSERT(env != NULL);
429
430         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
431         LASSERT(info != NULL);
432
433         fld_thread_info_init(req, info);
434         rc = fld_req_handle(req, info);
435         fld_thread_info_fini(info);
436
437         return rc;
438 }
439
440 /*
441  * Entry point for handling FLD RPCs called from MDT.
442  */
443 int fld_query(struct com_thread_info *info)
444 {
445         return fld_handle(info->cti_pill->rc_req);
446 }
447 EXPORT_SYMBOL(fld_query);
448
449 /*
450  * Returns true, if fid is local to this server node.
451  *
452  * WARNING: this function is *not* guaranteed to return false if fid is
453  * remote: it makes an educated conservative guess only.
454  *
455  * fid_is_local() is supposed to be used in assertion checks only.
456  */
457 int fid_is_local(const struct lu_env *env,
458                  struct lu_site *site, const struct lu_fid *fid)
459 {
460         int result;
461         struct md_site *msite;
462         struct lu_seq_range *range;
463         struct fld_thread_info *info;
464         ENTRY;
465
466         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
467         range = &info->fti_lrange;
468
469         result = 1; /* conservatively assume fid is local */
470         msite = lu_site2md(site);
471         if (msite->ms_client_fld != NULL) {
472                 int rc;
473
474                 rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
475                                       fid_seq(fid), range);
476                 if (rc == 0)
477                         result = (range->lsr_index == msite->ms_node_id);
478         }
479         return result;
480 }
481 EXPORT_SYMBOL(fid_is_local);
482
483 static void fld_server_proc_fini(struct lu_server_fld *fld);
484
485 #ifdef LPROCFS
486 static int fld_server_proc_init(struct lu_server_fld *fld)
487 {
488         int rc = 0;
489         ENTRY;
490
491         fld->lsf_proc_dir = lprocfs_register(fld->lsf_name,
492                                              fld_type_proc_dir,
493                                              fld_server_proc_list, fld);
494         if (IS_ERR(fld->lsf_proc_dir)) {
495                 rc = PTR_ERR(fld->lsf_proc_dir);
496                 RETURN(rc);
497         }
498
499         RETURN(rc);
500 }
501
502 static void fld_server_proc_fini(struct lu_server_fld *fld)
503 {
504         ENTRY;
505         if (fld->lsf_proc_dir != NULL) {
506                 if (!IS_ERR(fld->lsf_proc_dir))
507                         lprocfs_remove(&fld->lsf_proc_dir);
508                 fld->lsf_proc_dir = NULL;
509         }
510         EXIT;
511 }
512 #else
513 static int fld_server_proc_init(struct lu_server_fld *fld)
514 {
515         return 0;
516 }
517
518 static void fld_server_proc_fini(struct lu_server_fld *fld)
519 {
520         return;
521 }
522 #endif
523
524 int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
525                     const char *prefix, const struct lu_env *env,
526                     int mds_node_id)
527 {
528         int cache_size, cache_threshold;
529         struct lu_seq_range range;
530         int rc;
531         ENTRY;
532
533         snprintf(fld->lsf_name, sizeof(fld->lsf_name),
534                  "srv-%s", prefix);
535
536         cache_size = FLD_SERVER_CACHE_SIZE /
537                 sizeof(struct fld_cache_entry);
538
539         cache_threshold = cache_size *
540                 FLD_SERVER_CACHE_THRESHOLD / 100;
541
542         cfs_mutex_init(&fld->lsf_lock);
543         fld->lsf_cache = fld_cache_init(fld->lsf_name,
544                                         cache_size, cache_threshold);
545         if (IS_ERR(fld->lsf_cache)) {
546                 rc = PTR_ERR(fld->lsf_cache);
547                 fld->lsf_cache = NULL;
548                 GOTO(out, rc);
549         }
550
551         if (!mds_node_id) {
552                 rc = fld_index_init(fld, env, dt);
553                 if (rc)
554                         GOTO(out, rc);
555         } else
556                 fld->lsf_obj = NULL;
557
558         rc = fld_server_proc_init(fld);
559         if (rc)
560                 GOTO(out, rc);
561
562         fld->lsf_control_exp = NULL;
563
564         /* Insert reserved sequence number of ".lustre" into fld cache. */
565         range.lsr_start = FID_SEQ_DOT_LUSTRE;
566         range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
567         range.lsr_index = 0;
568         range.lsr_flags = LU_SEQ_RANGE_MDT;
569         fld_cache_insert(fld->lsf_cache, &range);
570
571         EXIT;
572 out:
573         if (rc)
574                 fld_server_fini(fld, env);
575         return rc;
576 }
577 EXPORT_SYMBOL(fld_server_init);
578
579 void fld_server_fini(struct lu_server_fld *fld,
580                      const struct lu_env *env)
581 {
582         ENTRY;
583
584         fld_server_proc_fini(fld);
585         fld_index_fini(fld, env);
586
587         if (fld->lsf_cache != NULL) {
588                 if (!IS_ERR(fld->lsf_cache))
589                         fld_cache_fini(fld->lsf_cache);
590                 fld->lsf_cache = NULL;
591         }
592
593         EXIT;
594 }
595 EXPORT_SYMBOL(fld_server_fini);
596
597 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
598 MODULE_DESCRIPTION("Lustre FLD");
599 MODULE_LICENSE("GPL");
600
601 cfs_module(mdd, "0.1.0", fld_mod_init, fld_mod_exit);
602 #endif