Whamcloud - gitweb
b=3031
[fs/lustre-release.git] / lustre / cmobd / cm_mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_CMOBD
23
24 #include <linux/config.h>
25 #include <linux/module.h>
26 #include <linux/kernel.h>
27 #include <linux/obd_class.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_mds.h>
30 #include <linux/lustre_smfs.h>
31
32 #include "cm_internal.h"
33
34 /* converts mds_rec_setattr to struct iattr. */
35 static inline void cmobd_rec2iattr(struct mds_rec_setattr *rec,
36                                    struct iattr *iattr)
37 {
38         iattr->ia_uid = rec->sa_uid;
39         iattr->ia_gid = rec->sa_gid;
40         iattr->ia_mode = rec->sa_mode;
41         iattr->ia_size = rec->sa_size;
42         iattr->ia_valid = rec->sa_valid;
43         LTIME_S(iattr->ia_atime) = rec->sa_atime;
44         LTIME_S(iattr->ia_mtime) = rec->sa_mtime;
45         LTIME_S(iattr->ia_ctime) = rec->sa_ctime;
46         iattr->ia_attr_flags = rec->sa_attr_flags;
47 }
48
49 static void
50 cmobd_prepare_mdc_data(struct mdc_op_data *data, struct lustre_id *id1,
51                        struct lustre_id *id2, const char *name,
52                        int namelen, __u32 mode)
53 {
54         LASSERT(id1);
55         LASSERT(data);
56
57         memset(data, 0, sizeof(*data));
58
59         data->id1 = *id1;
60         if (id2)
61                 data->id2 = *id2;
62         else
63                 memset(&data->id2, 0, sizeof(data->id2));
64
65         data->valid = 0;
66         data->name = name;
67         data->namelen = namelen;
68         data->create_mode = mode;
69         data->mod_time = LTIME_S(CURRENT_TIME);
70 }
71
72 /* If mdc_setattr() is called with an 'iattr', then it is a normal RPC that
73  * should take the normal semaphore and go to the normal portal.
74  *
75  * If it is called with iattr->ia_valid & ATTR_FROM_OPEN, then it is a magic
76  * open-path setattr that should take the setattr semaphore and go to the
77  * setattr portal. */
78 static int cmobd_reint_setattr(struct obd_device *obd, void *record)
79 {
80         struct cm_obd *cmobd = &obd->u.cm;
81         struct ptlrpc_request *req = NULL;
82         struct mds_kml_pack_info *mkpi;
83         struct mds_rec_setattr *rec;
84         struct mdc_op_data *op_data;
85         struct lustre_msg *msg;
86         int ea1len, ea2len;
87         struct iattr iattr;
88         void *ea1, *ea2;
89         int rc = 0;
90         ENTRY;
91
92         mkpi = (struct mds_kml_pack_info *)record;
93         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
94
95         rec = lustre_msg_buf(msg, 0, 0);
96         if (!rec) 
97                 RETURN(-EINVAL);
98
99         /* converting setattr rec to struct iattr. */
100         cmobd_rec2iattr(rec, &iattr);
101
102         /* FIXME-UMKA: here should be handling of setattr() from open. Bug
103          * #249. Will be fixed later. */
104
105         /* converting localstore cookie to remote lustre_id. */
106         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->sa_id,
107                           &rec->sa_id, sizeof(rec->sa_id));
108         if (rc) {
109                 CERROR("Can't read master MDS store cookie "
110                        "from local inode EA, err = %d.\n", rc);
111                 RETURN(rc);
112         }
113
114         OBD_ALLOC(op_data, sizeof(*op_data));
115         if (op_data == NULL)
116                 RETURN(-ENOMEM);
117         cmobd_prepare_mdc_data(op_data, &rec->sa_id, NULL,
118                                NULL, 0, 0);
119
120         /* handling possible EAs. */
121         ea1 = lustre_msg_buf(msg, 1, 0);
122         ea1len = ea1 ? msg->buflens[1] : 0;
123
124         ea2 = lustre_msg_buf(msg, 2, 0);
125         ea2len = ea2 ? msg->buflens[2] : 0;
126
127         rc = md_setattr(cmobd->master_exp, op_data, &iattr,
128                         ea1, ea1len, ea2, ea2len, &req);
129         OBD_FREE(op_data, sizeof(*op_data));
130
131         if (req)
132                 ptlrpc_req_finished(req);
133         RETURN(rc);
134 }
135
136 static int cmobd_reint_create(struct obd_device *obd, void *record)
137 {
138         struct cm_obd *cmobd = &obd->u.cm;
139         struct ptlrpc_request *req = NULL;
140         struct mds_kml_pack_info *mkpi;
141         int rc = 0, namelen, datalen;
142         struct mds_rec_create *rec;
143         struct mdc_op_data *op_data;
144         struct lustre_msg *msg;
145         struct mds_body *body;
146         struct lustre_id lid;
147         char *name, *data;
148         ENTRY;
149
150         mkpi = (struct mds_kml_pack_info *)record;
151         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
152
153         rec = lustre_msg_buf(msg, 0, 0);
154         if (!rec) 
155                 RETURN(-EINVAL);
156         
157         lid = rec->cr_replayid;
158
159         /* zeroing @rec->cr_replayid out in request, as master MDS should create
160          * own inode (with own store cookie). */
161         memset(&rec->cr_replayid, 0, sizeof(rec->cr_replayid));
162
163         /* converting local inode store cookie to remote lustre_id. */
164         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->cr_id,
165                           &rec->cr_id, sizeof(rec->cr_id));
166         if (rc) {
167                 CERROR("Can't read master MDS store cookie "
168                        "from local inode EA, err = %d.\n", rc);
169                 RETURN(rc);
170         }
171
172         /* getting name to be created and its length */
173         name = lustre_msg_string(msg, 1, 0);
174         namelen = name ? msg->buflens[1] - 1 : 0;
175
176         /* getting misc data (symlink) and its length */
177         data = (char *)lustre_msg_buf(msg, 2, 0);
178         datalen = data ? msg->buflens[2] : 0;
179
180         OBD_ALLOC(op_data, sizeof(*op_data));
181         if (op_data == NULL)
182                 RETURN(-ENOMEM);
183         
184         /* prepare mdc request data. */
185         cmobd_prepare_mdc_data(op_data, &rec->cr_id, &rec->cr_replayid,
186                                name, namelen, rec->cr_mode);
187
188         /* requesting to master to create object with passed attributes. */
189         rc = md_create(cmobd->master_exp, op_data, data, datalen,
190                        rec->cr_mode, current->fsuid, current->fsgid,
191                        rec->cr_rdev, &req);
192         OBD_FREE(op_data, sizeof(*op_data));
193
194         if (!rc) {
195                 /* here we save store cookie from master MDS to local 
196                  * inode EA. */
197                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
198
199                 rc = mds_update_mid(cmobd->cache_exp->exp_obd, &lid,
200                                     &body->id1, sizeof(body->id1));
201         }
202
203         if (req)
204                 ptlrpc_req_finished(req);
205         RETURN(rc);
206 }
207
208 static int cmobd_reint_unlink(struct obd_device *obd, void *record)
209 {
210         struct cm_obd *cmobd = &obd->u.cm;
211         struct ptlrpc_request *req = NULL;
212         struct mds_kml_pack_info *mkpi;
213         struct mdc_op_data *op_data;
214         struct mds_rec_unlink *rec;
215         struct lustre_msg *msg;
216         int rc = 0, namelen;
217         char *name = NULL;
218         ENTRY;
219         
220         mkpi = (struct mds_kml_pack_info *)record;
221         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
222
223         rec = lustre_msg_buf(msg, 0, 0);
224         if (!rec) 
225                 RETURN(-EINVAL);
226
227         /* converting local store cookie to remote lustre_id. */
228         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->ul_id1,
229                           &rec->ul_id1, sizeof(rec->ul_id1));
230         if (rc) {
231                 CERROR("Can't read master MDS store cookie "
232                        "from local inode EA, err = %d.\n", rc);
233                 RETURN(rc);
234         }
235
236         /* getting name to be created and its length */
237         name = lustre_msg_string(msg, 1, 0);
238         namelen = name ? msg->buflens[1] - 1 : 0;
239
240         OBD_ALLOC(op_data, sizeof(*op_data));
241         if (op_data == NULL)
242                 RETURN(-ENOMEM);
243
244         /* prepare mdc request data. */
245         cmobd_prepare_mdc_data(op_data, &rec->ul_id1, NULL,
246                                name, namelen, rec->ul_mode);
247
248         rc = md_unlink(cmobd->master_exp, op_data, &req);
249         OBD_FREE(op_data, sizeof(*op_data));
250
251         if (req)
252                 ptlrpc_req_finished(req);
253         RETURN(rc);
254 }
255
256 static int cmobd_reint_link(struct obd_device *obd, void *record)
257 {
258         struct cm_obd *cmobd = &obd->u.cm;
259         struct ptlrpc_request *req = NULL;
260         struct mds_kml_pack_info *mkpi;
261         struct mdc_op_data *op_data;
262         struct mds_rec_link *rec;
263         struct lustre_msg *msg;
264         int rc = 0, namelen;
265         char *name;
266         ENTRY;
267
268         mkpi = (struct mds_kml_pack_info *)record;
269         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
270         
271         rec = lustre_msg_buf(msg, 0, 0);
272         if (!rec) 
273                 RETURN(-EINVAL);
274
275         /* converting local store cookie for both ids to remote lustre_id. */
276         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->lk_id1,
277                           &rec->lk_id1, sizeof(rec->lk_id1));
278         if (rc) {
279                 CERROR("Can't read master MDS store cookie "
280                        "from local inode EA, err = %d.\n", rc);
281                 RETURN(rc);
282         }
283         
284         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->lk_id2,
285                           &rec->lk_id2, sizeof(rec->lk_id2));
286         if (rc) {
287                 CERROR("Can't read master MDS store cookie "
288                        "from local inode EA, err = %d.\n", rc);
289                 RETURN(rc);
290         }
291         
292         /* getting name to be created and its length */
293         name = lustre_msg_string(msg, 1, 0);
294         namelen = name ? msg->buflens[1] - 1: 0;
295
296         OBD_ALLOC(op_data, sizeof(*op_data));
297         if (op_data == NULL)
298                 RETURN(-ENOMEM);
299
300         /* prepare mdc request data. */
301         cmobd_prepare_mdc_data(op_data, &rec->lk_id1, &rec->lk_id2,
302                                name, namelen, 0);
303
304         rc = md_link(cmobd->master_exp, op_data, &req);
305         OBD_FREE(op_data, sizeof(*op_data));
306
307         if (req)
308                 ptlrpc_req_finished(req);
309         RETURN(rc);
310 }
311
312 static int cmobd_reint_rename(struct obd_device *obd, void *record)
313 {
314         struct cm_obd *cmobd = &obd->u.cm;
315         struct ptlrpc_request *req = NULL;
316         struct mds_kml_pack_info *mkpi;
317         struct mdc_op_data *op_data;
318         struct mds_rec_rename *rec;
319         int rc = 0, oldlen, newlen;
320         struct lustre_msg *msg;
321         char *old, *new;
322         ENTRY;
323
324         mkpi = (struct mds_kml_pack_info *)record;
325         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
326         
327         rec = lustre_msg_buf(msg, 0, 0);
328         if (!rec) 
329                 RETURN(-EINVAL);
330         
331         /* converting local store cookie for both ids to remote lustre_id. */
332         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->rn_id1,
333                           &rec->rn_id1, sizeof(rec->rn_id1));
334         if (rc) {
335                 CERROR("Can't read master MDS store cookie "
336                        "from local inode EA, err = %d.\n", rc);
337                 RETURN(rc);
338         }
339         
340         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->rn_id2,
341                           &rec->rn_id2, sizeof(rec->rn_id2));
342         if (rc) {
343                 CERROR("Can't read master MDS store cookie "
344                        "from local inode EA, err = %d.\n", rc);
345                 RETURN(rc);
346         }
347
348         /* getting old name and its length */
349         old = lustre_msg_string(msg, 1, 0);
350         oldlen = old ? msg->buflens[1] - 1 : 0;
351
352         /* getting new len and its length */
353         new = lustre_msg_string(msg, 2, 0);
354         newlen = new ? msg->buflens[2] - 1: 0;
355         
356         OBD_ALLOC(op_data, sizeof(*op_data));
357         if (op_data == NULL)
358                 RETURN(-ENOMEM);
359         
360         /* prepare mdc request data. */
361         cmobd_prepare_mdc_data(op_data, &rec->rn_id1, &rec->rn_id1,
362                                NULL, 0, 0);
363
364         rc = md_rename(cmobd->master_exp, op_data, old, oldlen,
365                        new, newlen, &req);
366         OBD_FREE(op_data, sizeof(*op_data));
367
368         if (req)
369                 ptlrpc_req_finished(req);
370         RETURN(rc);
371 }
372
373 typedef int (*cmobd_reint_rec_func_t)(struct obd_device *, void *);
374
375 static cmobd_reint_rec_func_t mds_reint_handler[REINT_MAX + 1] = {
376         [REINT_SETATTR] cmobd_reint_setattr,
377         [REINT_CREATE] cmobd_reint_create,
378         [REINT_LINK] cmobd_reint_link,
379         [REINT_UNLINK] cmobd_reint_unlink,
380         [REINT_RENAME] cmobd_reint_rename,
381 };
382
383 int cmobd_reint_mds(struct obd_device *obd, void *record, int dummy)
384 {
385         struct mds_kml_pack_info *mkpi;
386         struct lustre_msg *msg;
387         __u32 opcode;
388         
389         mkpi = (struct mds_kml_pack_info *)record;
390         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
391         
392         opcode = *(__u32 *)lustre_msg_buf(msg, 0, 0);
393         
394         if (opcode > REINT_MAX || opcode <= 0) {
395                 CERROR("Invalid mds reint opcode %u\n",
396                        opcode);
397                 return -EINVAL;
398         }
399         
400         return mds_reint_handler[opcode](obd, record);
401