Whamcloud - gitweb
d77acf277a5f70d7a638fddc90d451a28203e88e
[fs/lustre-release.git] / lustre / cmobd / cm_mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_CMOBD
23
24 #include <linux/config.h>
25 #include <linux/module.h>
26 #include <linux/kernel.h>
27 #include <linux/obd_class.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_mds.h>
30 #include <linux/lustre_smfs.h>
31
32 #include "cm_internal.h"
33
34 /* converts mds_rec_setattr to struct iattr. */
35 static inline void cmobd_rec2iattr(struct mds_rec_setattr *rec,
36                                    struct iattr *iattr)
37 {
38         iattr->ia_uid = rec->sa_uid;
39         iattr->ia_gid = rec->sa_gid;
40         iattr->ia_mode = rec->sa_mode;
41         iattr->ia_size = rec->sa_size;
42         iattr->ia_valid = rec->sa_valid;
43         LTIME_S(iattr->ia_atime) = rec->sa_atime;
44         LTIME_S(iattr->ia_mtime) = rec->sa_mtime;
45         LTIME_S(iattr->ia_ctime) = rec->sa_ctime;
46         iattr->ia_attr_flags = rec->sa_attr_flags;
47 }
48
49 static void
50 cmobd_prepare_mdc_data(struct mdc_op_data *data, struct lustre_id *id1,
51                        struct lustre_id *id2, const char *name,
52                        int namelen, __u32 mode)
53 {
54         LASSERT(id1);
55         LASSERT(data);
56
57         memset(data, 0, sizeof(*data));
58
59         data->id1 = *id1;
60         if (id2)
61                 data->id2 = *id2;
62         else
63                 memset(&data->id2, 0, sizeof(data->id2));
64
65         data->valid = 0;
66         data->name = name;
67         data->namelen = namelen;
68         data->create_mode = mode;
69         data->mod_time = CURRENT_TIME;
70 }
71
72 /* If mdc_setattr() is called with an 'iattr', then it is a normal RPC that
73  * should take the normal semaphore and go to the normal portal.
74  *
75  * If it is called with iattr->ia_valid & ATTR_FROM_OPEN, then it is a magic
76  * open-path setattr that should take the setattr semaphore and go to the
77  * setattr portal. */
78 static int cmobd_reint_setattr(struct obd_device *obd, void *record)
79 {
80         struct cm_obd *cmobd = &obd->u.cm;
81         struct ptlrpc_request *req = NULL;
82         struct mds_kml_pack_info *mkpi;
83         struct mds_rec_setattr *rec;
84         struct mdc_op_data op_data;
85         struct lustre_msg *msg;
86         int ea1len, ea2len;
87         struct iattr iattr;
88         void *ea1, *ea2;
89         int rc = 0;
90         ENTRY;
91
92         mkpi = (struct mds_kml_pack_info *)record;
93         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
94
95         rec = lustre_msg_buf(msg, 0, 0);
96         if (!rec) 
97                 RETURN(-EINVAL);
98
99         /* converting setattr rec to struct iattr. */
100         cmobd_rec2iattr(rec, &iattr);
101
102         /* FIXME-UMKA: here should be handling of setattr() from open. Bug
103          * #249. Will be fixed later. */
104
105         /* converting localstore cookie to remote lustre_id. */
106         rc = mds_read_mid(cmobd->cache_obd, &rec->sa_id,
107                           &rec->sa_id, sizeof(rec->sa_id));
108         if (rc) {
109                 CERROR("Can't read master MDS store cookie "
110                        "from local inode EA, err = %d.\n", rc);
111                 RETURN(rc);
112         }
113
114         cmobd_prepare_mdc_data(&op_data, &rec->sa_id, NULL,
115                                NULL, 0, 0);
116
117         /* handling possible EAs. */
118         ea1 = lustre_msg_buf(msg, 1, 0);
119         ea1len = ea1 ? msg->buflens[1] : 0;
120
121         ea2 = lustre_msg_buf(msg, 2, 0);
122         ea2len = ea2 ? msg->buflens[2] : 0;
123
124         rc = md_setattr(cmobd->master_exp, &op_data, &iattr,
125                         ea1, ea1len, ea2, ea2len, &req);
126
127         if (req)
128                 ptlrpc_req_finished(req);
129         RETURN(rc);
130 }
131
132 static int cmobd_reint_create(struct obd_device *obd, void *record)
133 {
134         struct cm_obd *cmobd = &obd->u.cm;
135         struct ptlrpc_request *req = NULL;
136         struct mds_kml_pack_info *mkpi;
137         int rc = 0, namelen, datalen;
138         struct mds_rec_create *rec;
139         struct mdc_op_data op_data;
140         struct lustre_msg *msg;
141         struct mds_body *body;
142         struct lustre_id lid;
143         char *name, *data;
144         ENTRY;
145
146         mkpi = (struct mds_kml_pack_info *)record;
147         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
148
149         rec = lustre_msg_buf(msg, 0, 0);
150         if (!rec) 
151                 RETURN(-EINVAL);
152         
153         lid = rec->cr_replayid;
154
155         /* zeroing @rec->cr_replayid out in request, as master MDS should create
156          * own inode (with own store cookie). */
157         memset(&rec->cr_replayid, 0, sizeof(rec->cr_replayid));
158
159         /* converting local inode store cookie to remote lustre_id. */
160         rc = mds_read_mid(cmobd->cache_obd, &rec->cr_id,
161                           &rec->cr_id, sizeof(rec->cr_id));
162         if (rc) {
163                 CERROR("Can't read master MDS store cookie "
164                        "from local inode EA, err = %d.\n", rc);
165                 RETURN(rc);
166         }
167
168         /* getting name to be created and its length */
169         name = lustre_msg_string(msg, 1, 0);
170         namelen = name ? msg->buflens[1] - 1 : 0;
171
172         /* getting misc data (symlink) and its length */
173         data = (char *)lustre_msg_buf(msg, 2, 0);
174         datalen = data ? msg->buflens[2] : 0;
175
176         /* prepare mdc request data. */
177         cmobd_prepare_mdc_data(&op_data, &rec->cr_id, &rec->cr_replayid,
178                                name, namelen, rec->cr_mode);
179
180         /* requesting to master to create object with passed attributes. */
181         rc = md_create(cmobd->master_exp, &op_data, data, datalen,
182                        rec->cr_mode, current->fsuid, current->fsgid,
183                        rec->cr_rdev, &req);
184
185         if (!rc) {
186                 /* here we save store cookie from master MDS to local 
187                  * inode EA. */
188                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
189
190                 rc = mds_update_mid(cmobd->cache_obd, &lid,
191                                     &body->id1, sizeof(body->id1));
192         }
193
194         if (req)
195                 ptlrpc_req_finished(req);
196         RETURN(rc);
197 }
198
199 static int cmobd_reint_unlink(struct obd_device *obd, void *record)
200 {
201         struct cm_obd *cmobd = &obd->u.cm;
202         struct ptlrpc_request *req = NULL;
203         struct mds_kml_pack_info *mkpi;
204         struct mdc_op_data op_data;
205         struct mds_rec_unlink *rec;
206         struct lustre_msg *msg;
207         int rc = 0, namelen;
208         char *name = NULL;
209         ENTRY;
210         
211         mkpi = (struct mds_kml_pack_info *)record;
212         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
213
214         rec = lustre_msg_buf(msg, 0, 0);
215         if (!rec) 
216                 RETURN(-EINVAL);
217
218         /* converting local store cookie to remote lustre_id. */
219         rc = mds_read_mid(cmobd->cache_obd, &rec->ul_id1,
220                           &rec->ul_id1, sizeof(rec->ul_id1));
221         if (rc) {
222                 CERROR("Can't read master MDS store cookie "
223                        "from local inode EA, err = %d.\n", rc);
224                 RETURN(rc);
225         }
226
227         /* getting name to be created and its length */
228         name = lustre_msg_string(msg, 1, 0);
229         namelen = name ? msg->buflens[1] - 1 : 0;
230
231         /* prepare mdc request data. */
232         cmobd_prepare_mdc_data(&op_data, &rec->ul_id1, NULL,
233                                name, namelen, rec->ul_mode);
234
235         rc = md_unlink(cmobd->master_exp, &op_data, &req);
236
237         if (req)
238                 ptlrpc_req_finished(req);
239         RETURN(rc);
240 }
241
242 static int cmobd_reint_link(struct obd_device *obd, void *record)
243 {
244         struct cm_obd *cmobd = &obd->u.cm;
245         struct ptlrpc_request *req = NULL;
246         struct mds_kml_pack_info *mkpi;
247         struct mdc_op_data op_data;
248         struct mds_rec_link *rec;
249         struct lustre_msg *msg;
250         int rc = 0, namelen;
251         char *name;
252         ENTRY;
253
254         mkpi = (struct mds_kml_pack_info *)record;
255         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
256         
257         rec = lustre_msg_buf(msg, 0, 0);
258         if (!rec) 
259                 RETURN(-EINVAL);
260
261         /* converting local store cookie for both ids to remote lustre_id. */
262         rc = mds_read_mid(cmobd->cache_obd, &rec->lk_id1,
263                           &rec->lk_id1, sizeof(rec->lk_id1));
264         if (rc) {
265                 CERROR("Can't read master MDS store cookie "
266                        "from local inode EA, err = %d.\n", rc);
267                 RETURN(rc);
268         }
269         
270         rc = mds_read_mid(cmobd->cache_obd, &rec->lk_id2,
271                           &rec->lk_id2, sizeof(rec->lk_id2));
272         if (rc) {
273                 CERROR("Can't read master MDS store cookie "
274                        "from local inode EA, err = %d.\n", rc);
275                 RETURN(rc);
276         }
277         
278         /* getting name to be created and its length */
279         name = lustre_msg_string(msg, 1, 0);
280         namelen = name ? msg->buflens[1] - 1: 0;
281
282         /* prepare mdc request data. */
283         cmobd_prepare_mdc_data(&op_data, &rec->lk_id1, &rec->lk_id2,
284                                name, namelen, 0);
285
286         rc = md_link(cmobd->master_exp, &op_data, &req);
287
288         if (req)
289                 ptlrpc_req_finished(req);
290         RETURN(rc);
291 }
292
293 static int cmobd_reint_rename(struct obd_device *obd, void *record)
294 {
295         struct cm_obd *cmobd = &obd->u.cm;
296         struct ptlrpc_request *req = NULL;
297         struct mds_kml_pack_info *mkpi;
298         struct mdc_op_data op_data;
299         struct mds_rec_rename *rec;
300         int rc = 0, oldlen, newlen;
301         struct lustre_msg *msg;
302         char *old, *new;
303         ENTRY;
304
305         mkpi = (struct mds_kml_pack_info *)record;
306         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
307         
308         rec = lustre_msg_buf(msg, 0, 0);
309         if (!rec) 
310                 RETURN(-EINVAL);
311         
312         /* converting local store cookie for both ids to remote lustre_id. */
313         rc = mds_read_mid(cmobd->cache_obd, &rec->rn_id1,
314                           &rec->rn_id1, sizeof(rec->rn_id1));
315         if (rc) {
316                 CERROR("Can't read master MDS store cookie "
317                        "from local inode EA, err = %d.\n", rc);
318                 RETURN(rc);
319         }
320         
321         rc = mds_read_mid(cmobd->cache_obd, &rec->rn_id2,
322                           &rec->rn_id2, sizeof(rec->rn_id2));
323         if (rc) {
324                 CERROR("Can't read master MDS store cookie "
325                        "from local inode EA, err = %d.\n", rc);
326                 RETURN(rc);
327         }
328
329         /* getting old name and its length */
330         old = lustre_msg_string(msg, 1, 0);
331         oldlen = old ? msg->buflens[1] - 1 : 0;
332
333         /* getting new len and its length */
334         new = lustre_msg_string(msg, 2, 0);
335         newlen = new ? msg->buflens[2] - 1: 0;
336         
337         /* prepare mdc request data. */
338         cmobd_prepare_mdc_data(&op_data, &rec->rn_id1, &rec->rn_id1,
339                                NULL, 0, 0);
340
341         rc = md_rename(cmobd->master_exp, &op_data, old, oldlen,
342                        new, newlen, &req);
343
344         if (req)
345                 ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 typedef int (*cmobd_reint_rec_func_t)(struct obd_device *, void *);
350
351 static cmobd_reint_rec_func_t mds_reint_handler[REINT_MAX + 1] = {
352         [REINT_SETATTR] cmobd_reint_setattr,
353         [REINT_CREATE] cmobd_reint_create,
354         [REINT_LINK] cmobd_reint_link,
355         [REINT_UNLINK] cmobd_reint_unlink,
356         [REINT_RENAME] cmobd_reint_rename,
357 };
358
359 int cmobd_reint_mds(struct obd_device *obd, void *record, int dummy)
360 {
361         struct mds_kml_pack_info *mkpi;
362         struct lustre_msg *msg;
363         __u32 opcode;
364         
365         mkpi = (struct mds_kml_pack_info *)record;
366         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
367         
368         opcode = *(__u32 *)lustre_msg_buf(msg, 0, 0);
369         
370         if (opcode > REINT_MAX || opcode <= 0) {
371                 CERROR("Invalid mds reint opcode %u\n",
372                        opcode);
373                 return -EINVAL;
374         }
375         
376         return mds_reint_handler[opcode](obd, record);
377