Whamcloud - gitweb
b= 13093
[fs/lustre-release.git] / lustre / fld / fld_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_handler.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *           WangDi <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FLD
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 # include <linux/jbd.h>
38 # include <asm/div64.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 # include <libcfs/list.h>
42 #endif
43
44 #include <obd.h>
45 #include <obd_class.h>
46 #include <lustre_ver.h>
47 #include <obd_support.h>
48 #include <lprocfs_status.h>
49
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include "fld_internal.h"
53
54 #ifdef __KERNEL__
55
56 LU_KEY_INIT_FINI(fld, struct fld_thread_info);
57
58 struct lu_context_key fld_thread_key = {
59         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD,
60         .lct_init = fld_key_init,
61         .lct_fini = fld_key_fini
62 };
63
64 cfs_proc_dir_entry_t *fld_type_proc_dir = NULL;
65
66 static int __init fld_mod_init(void)
67 {
68         fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME,
69                                              proc_lustre_root,
70                                              NULL, NULL);
71         if (IS_ERR(fld_type_proc_dir))
72                 return PTR_ERR(fld_type_proc_dir);
73
74         LU_CONTEXT_KEY_INIT(&fld_thread_key);
75         lu_context_key_register(&fld_thread_key);
76         return 0;
77 }
78
79 static void __exit fld_mod_exit(void)
80 {
81         lu_context_key_degister(&fld_thread_key);
82         if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) {
83                 lprocfs_remove(&fld_type_proc_dir);
84                 fld_type_proc_dir = NULL;
85         }
86 }
87
88 /* Insert index entry and update cache. */
89 int fld_server_create(struct lu_server_fld *fld,
90                       const struct lu_env *env,
91                       seqno_t seq, mdsno_t mds)
92 {
93         int rc;
94         ENTRY;
95         
96         rc = fld_index_create(fld, env, seq, mds);
97         
98         if (rc == 0) {
99                 /*
100                  * Do not return result of calling fld_cache_insert()
101                  * here. First of all because it may return -EEXISTS. Another
102                  * reason is that, we do not want to stop proceeding even after
103                  * cache errors.
104                  */
105                 fld_cache_insert(fld->lsf_cache, seq, mds);
106         }
107
108         RETURN(rc);
109 }
110 EXPORT_SYMBOL(fld_server_create);
111
112 /* Delete index entry. */
113 int fld_server_delete(struct lu_server_fld *fld,
114                       const struct lu_env *env,
115                       seqno_t seq)
116 {
117         int rc;
118         ENTRY;
119
120         fld_cache_delete(fld->lsf_cache, seq);
121         rc = fld_index_delete(fld, env, seq);
122         
123         RETURN(rc);
124 }
125 EXPORT_SYMBOL(fld_server_delete);
126
127 /* Lookup mds by seq. */
128 int fld_server_lookup(struct lu_server_fld *fld,
129                       const struct lu_env *env,
130                       seqno_t seq, mdsno_t *mds)
131 {
132         int rc;
133         ENTRY;
134         
135         /* Lookup it in the cache. */
136         rc = fld_cache_lookup(fld->lsf_cache, seq, mds);
137         if (rc == 0)
138                 RETURN(0);
139
140         rc = fld_index_lookup(fld, env, seq, mds);
141         if (rc == 0) {
142                 /*
143                  * Do not return error here as well. See previous comment in
144                  * same situation in function fld_server_create().
145                  */
146                 fld_cache_insert(fld->lsf_cache, seq, *mds);
147         }
148         RETURN(rc);
149 }
150 EXPORT_SYMBOL(fld_server_lookup);
151
152 static int fld_server_handle(struct lu_server_fld *fld,
153                              const struct lu_env *env,
154                              __u32 opc, struct md_fld *mf,
155                              struct fld_thread_info *info)
156 {
157         int rc;
158         ENTRY;
159
160         switch (opc) {
161         case FLD_CREATE:
162                 rc = fld_server_create(fld, env,
163                                        mf->mf_seq, mf->mf_mds);
164
165                 /* Do not return -EEXIST error for resent case */
166                 if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST)
167                         rc = 0;
168                 break;
169         case FLD_DELETE:
170                 rc = fld_server_delete(fld, env, mf->mf_seq);
171
172                 /* Do not return -ENOENT error for resent case */
173                 if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT)
174                         rc = 0;
175                 break;
176         case FLD_LOOKUP:
177                 rc = fld_server_lookup(fld, env,
178                                        mf->mf_seq, &mf->mf_mds);
179                 break;
180         default:
181                 rc = -EINVAL;
182                 break;
183         }
184
185         CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: "
186                LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc,
187                mf->mf_seq, mf->mf_mds);
188         
189         RETURN(rc);
190
191 }
192
193 static int fld_req_handle(struct ptlrpc_request *req,
194                           struct fld_thread_info *info)
195 {
196         struct lu_site *site;
197         struct md_fld *in;
198         struct md_fld *out;
199         int rc;
200         __u32 *opc;
201         ENTRY;
202
203         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
204
205         rc = req_capsule_pack(&info->fti_pill);
206         if (rc)
207                 RETURN(err_serious(rc));
208
209         opc = req_capsule_client_get(&info->fti_pill, &RMF_FLD_OPC);
210         if (opc != NULL) {
211                 in = req_capsule_client_get(&info->fti_pill, &RMF_FLD_MDFLD);
212                 if (in == NULL)
213                         RETURN(err_serious(-EPROTO));
214                 out = req_capsule_server_get(&info->fti_pill, &RMF_FLD_MDFLD);
215                 if (out == NULL)
216                         RETURN(err_serious(-EPROTO));
217                 *out = *in;
218
219                 rc = fld_server_handle(site->ls_server_fld,
220                                        req->rq_svc_thread->t_env,
221                                        *opc, out, info);
222         } else
223                 rc = err_serious(-EPROTO);
224
225         RETURN(rc);
226 }
227
228 static void fld_thread_info_init(struct ptlrpc_request *req,
229                                  struct fld_thread_info *info)
230 {
231         int i;
232
233         info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg);
234
235         /* Mark rep buffer as req-layout stuff expects. */
236         for (i = 0; i < ARRAY_SIZE(info->fti_rep_buf_size); i++)
237                 info->fti_rep_buf_size[i] = -1;
238
239         /* Init request capsule. */
240         req_capsule_init(&info->fti_pill, req, RCL_SERVER,
241                          info->fti_rep_buf_size);
242
243         req_capsule_set(&info->fti_pill, &RQF_FLD_QUERY);
244 }
245
246 static void fld_thread_info_fini(struct fld_thread_info *info)
247 {
248         req_capsule_fini(&info->fti_pill);
249 }
250
251 static int fld_handle(struct ptlrpc_request *req)
252 {
253         struct fld_thread_info *info;
254         const struct lu_env *env;
255         int rc;
256
257         env = req->rq_svc_thread->t_env;
258         LASSERT(env != NULL);
259
260         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
261         LASSERT(info != NULL);
262
263         fld_thread_info_init(req, info);
264         rc = fld_req_handle(req, info);
265         fld_thread_info_fini(info);
266
267         return rc;
268 }
269
270 /*
271  * Entry point for handling FLD RPCs called from MDT.
272  */
273 int fld_query(struct com_thread_info *info)
274 {
275         return fld_handle(info->cti_pill.rc_req);
276 }
277 EXPORT_SYMBOL(fld_query);
278
279 /*
280  * Returns true, if fid is local to this server node.
281  *
282  * WARNING: this function is *not* guaranteed to return false if fid is
283  * remote: it makes an educated conservative guess only.
284  *
285  * fid_is_local() is supposed to be used in assertion checks only.
286  */
287 int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
288 {
289         int result;
290
291         result = 1; /* conservatively assume fid is local */
292         if (site->ls_client_fld != NULL) {
293                 mdsno_t mds;
294                 int rc;
295
296                 rc = fld_cache_lookup(site->ls_client_fld->lcf_cache,
297                                       fid_seq(fid), &mds);
298                 if (rc == 0)
299                         result = (mds == site->ls_node_id);
300         }
301         return result;
302 }
303 EXPORT_SYMBOL(fid_is_local);
304
305 static void fld_server_proc_fini(struct lu_server_fld *fld);
306
307 #ifdef LPROCFS
308 static int fld_server_proc_init(struct lu_server_fld *fld)
309 {
310         int rc = 0;
311         ENTRY;
312
313         fld->lsf_proc_dir = lprocfs_register(fld->lsf_name,
314                                              fld_type_proc_dir,
315                                              fld_server_proc_list, fld);
316         if (IS_ERR(fld->lsf_proc_dir)) {
317                 rc = PTR_ERR(fld->lsf_proc_dir);
318                 RETURN(rc);
319         }
320
321         RETURN(rc);
322 }
323
324 static void fld_server_proc_fini(struct lu_server_fld *fld)
325 {
326         ENTRY;
327         if (fld->lsf_proc_dir != NULL) {
328                 if (!IS_ERR(fld->lsf_proc_dir))
329                         lprocfs_remove(&fld->lsf_proc_dir);
330                 fld->lsf_proc_dir = NULL;
331         }
332         EXIT;
333 }
334 #else
335 static int fld_server_proc_init(struct lu_server_fld *fld)
336 {
337         return 0;
338 }
339
340 static void fld_server_proc_fini(struct lu_server_fld *fld)
341 {
342         return;
343 }
344 #endif
345
346 int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
347                     const char *prefix, const struct lu_env *env)
348 {
349         int cache_size, cache_threshold;
350         int rc;
351         ENTRY;
352
353         snprintf(fld->lsf_name, sizeof(fld->lsf_name),
354                  "srv-%s", prefix);
355
356         cache_size = FLD_SERVER_CACHE_SIZE /
357                 sizeof(struct fld_cache_entry);
358
359         cache_threshold = cache_size *
360                 FLD_SERVER_CACHE_THRESHOLD / 100;
361
362         fld->lsf_cache = fld_cache_init(fld->lsf_name,
363                                         FLD_SERVER_HTABLE_SIZE,
364                                         cache_size, cache_threshold);
365         if (IS_ERR(fld->lsf_cache)) {
366                 rc = PTR_ERR(fld->lsf_cache);
367                 fld->lsf_cache = NULL;
368                 GOTO(out, rc);
369         }
370
371         rc = fld_index_init(fld, env, dt);
372         if (rc)
373                 GOTO(out, rc);
374
375         rc = fld_server_proc_init(fld);
376         if (rc)
377                 GOTO(out, rc);
378
379         EXIT;
380 out:
381         if (rc)
382                 fld_server_fini(fld, env);
383         return rc;
384 }
385 EXPORT_SYMBOL(fld_server_init);
386
387 void fld_server_fini(struct lu_server_fld *fld,
388                      const struct lu_env *env)
389 {
390         ENTRY;
391
392         fld_server_proc_fini(fld);
393         fld_index_fini(fld, env);
394
395         if (fld->lsf_cache != NULL) {
396                 if (!IS_ERR(fld->lsf_cache))
397                         fld_cache_fini(fld->lsf_cache);
398                 fld->lsf_cache = NULL;
399         }
400
401         EXIT;
402 }
403 EXPORT_SYMBOL(fld_server_fini);
404
405 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
406 MODULE_DESCRIPTION("Lustre FLD");
407 MODULE_LICENSE("GPL");
408
409 cfs_module(mdd, "0.1.0", fld_mod_init, fld_mod_exit);
410 #endif