Whamcloud - gitweb
current branches now use lnet from HEAD
[fs/lustre-release.git] / lustre / cmobd / cm_mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_CMOBD
23
24 #include <linux/config.h>
25 #include <linux/module.h>
26 #include <linux/kernel.h>
27 #include <linux/obd_class.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_mds.h>
30 #include <linux/lustre_smfs.h>
31
32 #include "cm_internal.h"
33
34 /* converts mds_rec_setattr to struct iattr. */
35 static inline void cmobd_rec2iattr(struct mds_rec_setattr *rec,
36                                    struct iattr *iattr)
37 {
38         iattr->ia_uid = rec->sa_uid;
39         iattr->ia_gid = rec->sa_gid;
40         iattr->ia_mode = rec->sa_mode;
41         iattr->ia_size = rec->sa_size;
42         iattr->ia_valid = rec->sa_valid;
43         LTIME_S(iattr->ia_atime) = rec->sa_atime;
44         LTIME_S(iattr->ia_mtime) = rec->sa_mtime;
45         LTIME_S(iattr->ia_ctime) = rec->sa_ctime;
46         iattr->ia_attr_flags = rec->sa_attr_flags;
47 }
48
49 static void
50 cmobd_prepare_mdc_data(struct mdc_op_data *data, struct lustre_id *id1,
51                        struct lustre_id *id2, const char *name,
52                        int namelen, __u32 mode, __u32 flags)
53 {
54         LASSERT(id1);
55         LASSERT(data);
56
57         memset(data, 0, sizeof(*data));
58
59         data->id1 = *id1;
60         if (id2)
61                 data->id2 = *id2;
62
63         data->valid = 0;
64         data->name = name;
65         data->flags = flags;
66         data->namelen = namelen;
67         data->create_mode = mode;
68         data->mod_time = LTIME_S(CURRENT_TIME);
69
70         /* zeroing out store cookie, as it makes no sense on master MDS and may
71          * also confuse it as may be considered as recovery case. */
72         memset(&data->id1.li_stc, 0, sizeof(data->id1.li_stc));
73         memset(&data->id2.li_stc, 0, sizeof(data->id2.li_stc));
74 }
75
76 /* If mdc_setattr() is called with an 'iattr', then it is a normal RPC that
77  * should take the normal semaphore and go to the normal portal.
78  *
79  * If it is called with iattr->ia_valid & ATTR_FROM_OPEN, then it is a magic
80  * open-path setattr that should take the setattr semaphore and go to the
81  * setattr portal. */
82 static int cmobd_reint_setattr(struct obd_device *obd, void *record)
83 {
84         struct cm_obd *cmobd = &obd->u.cm;
85         struct ptlrpc_request *req = NULL;
86         struct mds_kml_pack_info *mkpi;
87         struct mds_rec_setattr *rec;
88         struct mdc_op_data *op_data;
89         struct lustre_msg *msg;
90         int ea1len, ea2len;
91         struct iattr iattr;
92         void *ea1, *ea2;
93         int rc = 0;
94         ENTRY;
95
96         mkpi = (struct mds_kml_pack_info *)record;
97         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
98
99         rec = lustre_msg_buf(msg, 0, 0);
100         if (!rec) 
101                 RETURN(-EINVAL);
102
103         /* converting setattr rec to struct iattr. */
104         cmobd_rec2iattr(rec, &iattr);
105
106         /* FIXME-UMKA: here should be handling of setattr() from open. Bug
107          * #249. Will be fixed later. */
108
109         OBD_ALLOC(op_data, sizeof(*op_data));
110         if (op_data == NULL)
111                 RETURN(-ENOMEM);
112         cmobd_prepare_mdc_data(op_data, &rec->sa_id, NULL,
113                                NULL, 0, 0, MDS_REINT_REQ);
114
115         /* handling possible EAs. */
116         ea1 = lustre_msg_buf(msg, 1, 0);
117         ea1len = ea1 ? msg->buflens[1] : 0;
118
119         ea2 = lustre_msg_buf(msg, 2, 0);
120         ea2len = ea2 ? msg->buflens[2] : 0;
121
122         rc = md_setattr(cmobd->master_exp, op_data, &iattr,
123                         ea1, ea1len, ea2, ea2len, NULL, 0, &req);
124         OBD_FREE(op_data, sizeof(*op_data));
125
126         if (req)
127                 ptlrpc_req_finished(req);
128         RETURN(rc);
129 }
130  
131 static int cmobd_reint_create(struct obd_device *obd, void *record)
132 {
133         struct cm_obd *cmobd = &obd->u.cm;
134         struct ptlrpc_request *req = NULL;
135         struct mds_kml_pack_info *mkpi;
136         int rc = 0, namelen, datalen;
137         struct mdc_op_data *op_data;
138         struct mds_rec_create *rec;
139         struct lustre_msg *msg;
140         char *name, *data;
141         ENTRY;
142
143         mkpi = (struct mds_kml_pack_info *)record;
144         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
145
146         rec = lustre_msg_buf(msg, 0, 0);
147         if (!rec) 
148                 RETURN(-EINVAL);
149
150         /* getting name to be created and its length */
151         name = lustre_msg_string(msg, 1, 0);
152         namelen = name ? msg->buflens[1] - 1 : 0;
153   
154         /* getting misc data (symlink) and its length */
155         data = (char *)lustre_msg_buf(msg, 2, 0);
156         datalen = data ? msg->buflens[2] : 0;       
157
158         OBD_ALLOC(op_data, sizeof(*op_data));
159         if (op_data == NULL) 
160                 GOTO(exit, rc = -ENOMEM);
161
162         /* XXX: here is the issue preventing LMV from being used as master
163          * device for flushing cache to it. It is allusive to the fact that
164          * cache MDS parent id with wrong group component is used for forwarding
165          * reint requests to some MDS from those LMV knows about. As group is
166          * wrong - LMV forwards reqs to wrong MDS. Do not know how to fix it
167          * yet.  --umka */
168  
169         /* prepare mdc request data. */
170         cmobd_prepare_mdc_data(op_data, &rec->cr_id, &rec->cr_replayid,
171                                name, namelen, rec->cr_mode, MDS_REINT_REQ);
172
173         /* requesting to master to create object with passed attributes. */
174         rc = md_create(cmobd->master_exp, op_data, data, datalen,
175                        rec->cr_mode, current->fsuid, current->fsgid,
176                        rec->cr_rdev, &req);
177         OBD_FREE(op_data, sizeof(*op_data));
178 exit:
179         if (req)
180                 ptlrpc_req_finished(req);
181         
182         RETURN(rc);
183 }
184
185 static int cmobd_reint_unlink(struct obd_device *obd, void *record)
186 {
187         struct cm_obd *cmobd = &obd->u.cm;
188         struct ptlrpc_request *req = NULL;
189         struct mds_kml_pack_info *mkpi;
190         struct mdc_op_data *op_data;
191         struct mds_rec_unlink *rec;
192         struct lustre_msg *msg;
193         int rc = 0, namelen;
194         char *name = NULL;
195         ENTRY;
196         
197         mkpi = (struct mds_kml_pack_info *)record;
198         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
199
200         rec = lustre_msg_buf(msg, 0, 0);
201         if (!rec) 
202                 RETURN(-EINVAL);
203
204         /* getting name to be created and its length */
205         name = lustre_msg_string(msg, 1, 0);
206         namelen = name ? msg->buflens[1] - 1 : 0;
207
208         OBD_ALLOC(op_data, sizeof(*op_data));
209         if (op_data == NULL)
210                 RETURN(-ENOMEM);
211
212         /* prepare mdc request data. */
213         cmobd_prepare_mdc_data(op_data, &rec->ul_id1, NULL,
214                                name, namelen, rec->ul_mode,
215                                MDS_REINT_REQ);
216
217         rc = md_unlink(cmobd->master_exp, op_data, &req);
218         OBD_FREE(op_data, sizeof(*op_data));
219
220         if (req)
221                 ptlrpc_req_finished(req);
222         RETURN(rc);
223 }
224
225 static int cmobd_reint_link(struct obd_device *obd, void *record)
226 {
227         struct cm_obd *cmobd = &obd->u.cm;
228         struct ptlrpc_request *req = NULL;
229         struct mds_kml_pack_info *mkpi;
230         struct mdc_op_data *op_data;
231         struct mds_rec_link *rec;
232         struct lustre_msg *msg;
233         int rc = 0, namelen;
234         char *name;
235         ENTRY;
236
237         mkpi = (struct mds_kml_pack_info *)record;
238         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
239         
240         rec = lustre_msg_buf(msg, 0, 0);
241         if (!rec) 
242                 RETURN(-EINVAL);
243
244         /* getting name to be created and its length */
245         name = lustre_msg_string(msg, 1, 0);
246         namelen = name ? msg->buflens[1] - 1: 0;
247
248         OBD_ALLOC(op_data, sizeof(*op_data));
249         if (op_data == NULL)
250                 RETURN(-ENOMEM);
251
252         /* prepare mdc request data. */
253         cmobd_prepare_mdc_data(op_data, &rec->lk_id1, &rec->lk_id2,
254                                name, namelen, 0, MDS_REINT_REQ);
255
256         rc = md_link(cmobd->master_exp, op_data, &req);
257         OBD_FREE(op_data, sizeof(*op_data));
258
259         if (req)
260                 ptlrpc_req_finished(req);
261         RETURN(rc);
262 }
263
264 static int cmobd_reint_rename(struct obd_device *obd, void *record)
265 {
266         struct cm_obd *cmobd = &obd->u.cm;
267         struct ptlrpc_request *req = NULL;
268         struct mds_kml_pack_info *mkpi;
269         struct mdc_op_data *op_data;
270         struct mds_rec_rename *rec;
271         int rc = 0, oldlen, newlen;
272         struct lustre_msg *msg;
273         char *old, *new;
274         ENTRY;
275
276         mkpi = (struct mds_kml_pack_info *)record;
277         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
278         
279         rec = lustre_msg_buf(msg, 0, 0);
280         if (!rec) 
281                 RETURN(-EINVAL);
282
283         /* getting old name and its length */
284         old = lustre_msg_string(msg, 1, 0);
285         oldlen = old ? msg->buflens[1] - 1 : 0;
286
287         /* getting new len and its length */
288         new = lustre_msg_string(msg, 2, 0);
289         newlen = new ? msg->buflens[2] - 1: 0;
290         
291         OBD_ALLOC(op_data, sizeof(*op_data));
292         if (op_data == NULL)
293                 RETURN(-ENOMEM);
294         
295         /* prepare mdc request data. */
296         cmobd_prepare_mdc_data(op_data, &rec->rn_id1, &rec->rn_id1,
297                                NULL, 0, 0, MDS_REINT_REQ);
298
299         rc = md_rename(cmobd->master_exp, op_data, old, oldlen,
300                        new, newlen, &req);
301         OBD_FREE(op_data, sizeof(*op_data));
302
303         if (req)
304                 ptlrpc_req_finished(req);
305         RETURN(rc);
306 }
307
308 typedef int (*cmobd_reint_rec_func_t)(struct obd_device *, void *);
309
310 static cmobd_reint_rec_func_t mds_reint_handler[REINT_MAX + 1] = {
311         [REINT_SETATTR] cmobd_reint_setattr,
312         [REINT_CREATE] cmobd_reint_create,
313         [REINT_LINK] cmobd_reint_link,
314         [REINT_UNLINK] cmobd_reint_unlink,
315         [REINT_RENAME] cmobd_reint_rename,
316 };
317
318 int cmobd_reint_mds(struct obd_device *obd, void *record, int dummy)
319 {
320         struct mds_kml_pack_info *mkpi;
321         struct lustre_msg *msg;
322         __u32 opcode;
323         
324         mkpi = (struct mds_kml_pack_info *)record;
325         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
326         
327         opcode = *(__u32 *)lustre_msg_buf(msg, 0, 0);
328         
329         if (opcode > REINT_MAX || opcode <= 0) {
330                 CERROR("Invalid mds reint opcode %u\n",
331                        opcode);
332                 return -EINVAL;
333         }
334         
335         return mds_reint_handler[opcode](obd, record);
336