Whamcloud - gitweb
ab26a2a11e673cb3d2242af0339054f082b23a89
[fs/lustre-release.git] / lustre / fld / fld_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_handler.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: WangDi <wangdi@clusterfs.com>
9  *           Yury Umanets <umka@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FLD
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 # include <linux/jbd.h>
38 # include <asm/div64.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 # include <libcfs/list.h>
42 #endif
43
44 #include <obd.h>
45 #include <obd_class.h>
46 #include <lustre_ver.h>
47 #include <obd_support.h>
48 #include <lprocfs_status.h>
49
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include "fld_internal.h"
53
54 #ifdef __KERNEL__
55 static void *fld_key_init(const struct lu_context *ctx,
56                           struct lu_context_key *key)
57 {
58         struct fld_thread_info *info;
59         ENTRY;
60
61         OBD_ALLOC_PTR(info);
62         if (info == NULL)
63                 info = ERR_PTR(-ENOMEM);
64         RETURN(info);
65 }
66
67 static void fld_key_fini(const struct lu_context *ctx,
68                          struct lu_context_key *key, void *data)
69 {
70         struct fld_thread_info *info = data;
71         ENTRY;
72         OBD_FREE_PTR(info);
73         EXIT;
74 }
75
76 struct lu_context_key fld_thread_key = {
77         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD,
78         .lct_init = fld_key_init,
79         .lct_fini = fld_key_fini
80 };
81
82 static int __init fld_mod_init(void)
83 {
84         printk(KERN_INFO "Lustre: Fid Location Database; info@clusterfs.com\n");
85         lu_context_key_register(&fld_thread_key);
86         return 0;
87 }
88
89 static void __exit fld_mod_exit(void)
90 {
91         lu_context_key_degister(&fld_thread_key);
92         return;
93 }
94
95 /* insert index entry and update cache */
96 int fld_server_create(struct lu_server_fld *fld,
97                       const struct lu_context *ctx,
98                       seqno_t seq, mdsno_t mds)
99 {
100         return fld_index_create(fld, ctx, seq, mds);
101 }
102 EXPORT_SYMBOL(fld_server_create);
103
104 /* delete index entry */
105 int fld_server_delete(struct lu_server_fld *fld,
106                       const struct lu_context *ctx,
107                       seqno_t seq)
108 {
109         return fld_index_delete(fld, ctx, seq);
110 }
111 EXPORT_SYMBOL(fld_server_delete);
112
113 /* issue on-disk index lookup */
114 int fld_server_lookup(struct lu_server_fld *fld,
115                       const struct lu_context *ctx,
116                       seqno_t seq, mdsno_t *mds)
117 {
118         return fld_index_lookup(fld, ctx, seq, mds);
119 }
120 EXPORT_SYMBOL(fld_server_lookup);
121
122 static int fld_server_handle(struct lu_server_fld *fld,
123                              const struct lu_context *ctx,
124                              __u32 opc, struct md_fld *mf,
125                              struct fld_thread_info *info)
126 {
127         int rc;
128         ENTRY;
129
130         switch (opc) {
131         case FLD_CREATE:
132                 rc = fld_server_create(fld, ctx,
133                                        mf->mf_seq, mf->mf_mds);
134
135                 /* do not return -EEXIST error for resent case */
136                 if ((info->fti_flags & FLD_MSG_RESENT) && rc == -EEXIST)
137                         rc = 0;
138                 break;
139         case FLD_DELETE:
140                 rc = fld_server_delete(fld, ctx, mf->mf_seq);
141
142                 /* do not return -ENOENT error for resent case */
143                 if ((info->fti_flags & FLD_MSG_RESENT) && rc == -ENOENT)
144                         rc = 0;
145                 break;
146         case FLD_LOOKUP:
147                 rc = fld_server_lookup(fld, ctx,
148                                        mf->mf_seq, &mf->mf_mds);
149                 break;
150         default:
151                 rc = -EINVAL;
152                 break;
153         }
154         RETURN(rc);
155
156 }
157
158 static int fld_req_handle(struct ptlrpc_request *req,
159                           struct fld_thread_info *info)
160 {
161         struct lu_site *site;
162         struct md_fld *in;
163         struct md_fld *out;
164         int rc = -EPROTO;
165         __u32 *opc;
166         ENTRY;
167
168         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
169         
170         rc = req_capsule_pack(&info->fti_pill);
171         if (rc)
172                 RETURN(rc);
173
174         opc = req_capsule_client_get(&info->fti_pill, &RMF_FLD_OPC);
175         if (opc != NULL) {
176                 in = req_capsule_client_get(&info->fti_pill, &RMF_FLD_MDFLD);
177                 if (in == NULL)
178                         RETURN(-EPROTO);
179                 out = req_capsule_server_get(&info->fti_pill, &RMF_FLD_MDFLD);
180                 if (out == NULL)
181                         RETURN(-EPROTO);
182                 *out = *in;
183
184                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
185                         info->fti_flags |= FLD_MSG_REPLAY;
186
187                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)
188                         info->fti_flags |= FLD_MSG_RESENT;
189                 
190                 rc = fld_server_handle(site->ls_server_fld,
191                                        req->rq_svc_thread->t_ctx,
192                                        *opc, out, info);
193         } else
194                 rc = -EPROTO;
195
196         RETURN(rc);
197 }
198
199 static void fld_thread_info_init(struct ptlrpc_request *req,
200                                  struct fld_thread_info *info)
201 {
202         int i;
203
204         info->fti_flags = 0;
205         
206         /* mark rep buffer as req-layout stuff expects */
207         for (i = 0; i < ARRAY_SIZE(info->fti_rep_buf_size); i++)
208                 info->fti_rep_buf_size[i] = -1;
209
210         /* init request capsule */
211         req_capsule_init(&info->fti_pill, req, RCL_SERVER,
212                          info->fti_rep_buf_size);
213
214         req_capsule_set(&info->fti_pill, &RQF_FLD_QUERY);
215 }
216
217 static void fld_thread_info_fini(struct fld_thread_info *info)
218 {
219         req_capsule_fini(&info->fti_pill);
220 }
221
222 static int fld_handle(struct ptlrpc_request *req)
223 {
224         const struct lu_context *ctx;
225         struct fld_thread_info *info;
226         int rc;
227         
228         ctx = req->rq_svc_thread->t_ctx;
229         LASSERT(ctx != NULL);
230         LASSERT(ctx->lc_thread == req->rq_svc_thread);
231
232         info = lu_context_key_get(ctx, &fld_thread_key);
233         LASSERT(info != NULL);
234
235         fld_thread_info_init(req, info);
236         rc = fld_req_handle(req, info);
237         fld_thread_info_fini(info);
238
239         return rc;
240 }
241
242 /*
243  * Entry point for handling FLD RPCs called from MDT.
244  */
245 int fld_query(struct com_thread_info *info)
246 {
247         return fld_handle(info->cti_pill.rc_req);
248 }
249 EXPORT_SYMBOL(fld_query);
250
251 /*
252  * Returns true, if fid is local to this server node.
253  *
254  * WARNING: this function is *not* guaranteed to return false if fid is
255  * remote: it makes an educated conservative guess only.
256  *
257  * fid_is_local() is supposed to be used in assertion checks only.
258  */
259 int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
260 {
261         int result;
262
263         result = 1; /* conservatively assume fid is local */
264         if (site->ls_client_fld != NULL) {
265                 mdsno_t mds;
266                 int rc;
267
268                 rc = fld_cache_lookup(site->ls_client_fld->lcf_cache,
269                                       fid_seq(fid), &mds);
270                 if (rc == 0)
271                         result = (mds == site->ls_node_id);
272         }
273         return result;
274 }
275 EXPORT_SYMBOL(fid_is_local);
276
277 static void fld_server_proc_fini(struct lu_server_fld *fld);
278
279 #ifdef LPROCFS
280 static int fld_server_proc_init(struct lu_server_fld *fld)
281 {
282         int rc = 0;
283         ENTRY;
284
285         fld->lsf_proc_dir = lprocfs_register(fld->lsf_name,
286                                              proc_lustre_root,
287                                              fld_server_proc_list, fld);
288         if (IS_ERR(fld->lsf_proc_dir)) {
289                 rc = PTR_ERR(fld->lsf_proc_dir);
290                 RETURN(rc);
291         }
292
293         RETURN(rc);
294 }
295
296 static void fld_server_proc_fini(struct lu_server_fld *fld)
297 {
298         ENTRY;
299         if (fld->lsf_proc_dir != NULL) {
300                 if (!IS_ERR(fld->lsf_proc_dir))
301                         lprocfs_remove(fld->lsf_proc_dir);
302                 fld->lsf_proc_dir = NULL;
303         }
304         EXIT;
305 }
306 #else
307 static int fld_server_proc_init(struct lu_server_fld *fld)
308 {
309         return 0;
310 }
311
312 static void fld_server_proc_fini(struct lu_server_fld *fld)
313 {
314         return;
315 }
316 #endif
317
318 int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
319                     const char *uuid, const struct lu_context *ctx)
320 {
321         int rc;
322         ENTRY;
323
324         snprintf(fld->lsf_name, sizeof(fld->lsf_name),
325                  "%s-srv-%s", LUSTRE_FLD_NAME, uuid);
326
327         rc = fld_index_init(fld, ctx, dt);
328         if (rc)
329                 GOTO(out, rc);
330
331         rc = fld_server_proc_init(fld);
332         if (rc)
333                 GOTO(out, rc);
334
335         EXIT;
336 out:
337         if (rc)
338                 fld_server_fini(fld, ctx);
339         else
340                 CDEBUG(D_INFO|D_WARNING, "Server FLD\n");
341         return rc;
342 }
343 EXPORT_SYMBOL(fld_server_init);
344
345 void fld_server_fini(struct lu_server_fld *fld,
346                      const struct lu_context *ctx)
347 {
348         ENTRY;
349
350         fld_server_proc_fini(fld);
351         fld_index_fini(fld, ctx);
352         
353         EXIT;
354 }
355 EXPORT_SYMBOL(fld_server_fini);
356
357 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
358 MODULE_DESCRIPTION("Lustre FLD");
359 MODULE_LICENSE("GPL");
360
361 cfs_module(mdd, "0.1.0", fld_mod_init, fld_mod_exit);
362 #endif