Whamcloud - gitweb
- b_size_on_mds landed on HEAD:
[fs/lustre-release.git] / lustre / cmobd / cm_mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_CMOBD
23
24 #include <linux/config.h>
25 #include <linux/module.h>
26 #include <linux/kernel.h>
27 #include <linux/obd_class.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_mds.h>
30 #include <linux/lustre_smfs.h>
31
32 #include "cm_internal.h"
33
34 /* converts mds_rec_setattr to struct iattr. */
35 static inline void cmobd_rec2iattr(struct mds_rec_setattr *rec,
36                                    struct iattr *iattr)
37 {
38         iattr->ia_uid = rec->sa_uid;
39         iattr->ia_gid = rec->sa_gid;
40         iattr->ia_mode = rec->sa_mode;
41         iattr->ia_size = rec->sa_size;
42         iattr->ia_valid = rec->sa_valid;
43         LTIME_S(iattr->ia_atime) = rec->sa_atime;
44         LTIME_S(iattr->ia_mtime) = rec->sa_mtime;
45         LTIME_S(iattr->ia_ctime) = rec->sa_ctime;
46         iattr->ia_attr_flags = rec->sa_attr_flags;
47 }
48
49 static void
50 cmobd_prepare_mdc_data(struct mdc_op_data *data, struct lustre_id *id1,
51                        struct lustre_id *id2, const char *name,
52                        int namelen, __u32 mode)
53 {
54         LASSERT(id1);
55         LASSERT(data);
56
57         memset(data, 0, sizeof(*data));
58
59         data->id1 = *id1;
60         if (id2)
61                 data->id2 = *id2;
62         else
63                 memset(&data->id2, 0, sizeof(data->id2));
64
65         data->valid = 0;
66         data->name = name;
67         data->namelen = namelen;
68         data->create_mode = mode;
69         data->mod_time = LTIME_S(CURRENT_TIME);
70 }
71
72 /* If mdc_setattr() is called with an 'iattr', then it is a normal RPC that
73  * should take the normal semaphore and go to the normal portal.
74  *
75  * If it is called with iattr->ia_valid & ATTR_FROM_OPEN, then it is a magic
76  * open-path setattr that should take the setattr semaphore and go to the
77  * setattr portal. */
78 static int cmobd_reint_setattr(struct obd_device *obd, void *record)
79 {
80         struct cm_obd *cmobd = &obd->u.cm;
81         struct ptlrpc_request *req = NULL;
82         struct mds_kml_pack_info *mkpi;
83         struct mds_rec_setattr *rec;
84         struct mdc_op_data *op_data;
85         struct lustre_msg *msg;
86         int ea1len, ea2len;
87         struct iattr iattr;
88         void *ea1, *ea2;
89         int rc = 0;
90         ENTRY;
91
92         mkpi = (struct mds_kml_pack_info *)record;
93         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
94
95         rec = lustre_msg_buf(msg, 0, 0);
96         if (!rec) 
97                 RETURN(-EINVAL);
98
99         /* converting setattr rec to struct iattr. */
100         cmobd_rec2iattr(rec, &iattr);
101
102         /* FIXME-UMKA: here should be handling of setattr() from open. Bug
103          * #249. Will be fixed later. */
104
105         /* converting localstore cookie to remote lustre_id. */
106         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->sa_id,
107                           &rec->sa_id, sizeof(rec->sa_id));
108         if (rc) {
109                 CERROR("Can't read master MDS store cookie "
110                        "from local inode EA, err = %d.\n", rc);
111                 RETURN(rc);
112         }
113
114         OBD_ALLOC(op_data, sizeof(*op_data));
115         if (op_data == NULL)
116                 RETURN(-ENOMEM);
117         cmobd_prepare_mdc_data(op_data, &rec->sa_id, NULL,
118                                NULL, 0, 0);
119
120         /* handling possible EAs. */
121         ea1 = lustre_msg_buf(msg, 1, 0);
122         ea1len = ea1 ? msg->buflens[1] : 0;
123
124         ea2 = lustre_msg_buf(msg, 2, 0);
125         ea2len = ea2 ? msg->buflens[2] : 0;
126
127         rc = md_setattr(cmobd->master_exp, op_data, &iattr,
128                         ea1, ea1len, ea2, ea2len, &req);
129         OBD_FREE(op_data, sizeof(*op_data));
130
131         if (req)
132                 ptlrpc_req_finished(req);
133         RETURN(rc);
134 }
135  
136 static int cmobd_reint_create(struct obd_device *obd, void *record)
137 {
138         struct cm_obd *cmobd = &obd->u.cm;
139         struct ptlrpc_request *req = NULL;
140         struct mds_kml_pack_info *mkpi;
141         int rc = 0, namelen, datalen, alloc = 0;
142         struct mds_rec_create *rec;
143         struct mdc_op_data *op_data;
144         struct lustre_msg *msg;
145         struct mds_body *body;
146         struct lustre_id lid;
147         char *name, *data;
148         ENTRY;
149
150         mkpi = (struct mds_kml_pack_info *)record;
151         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
152
153         rec = lustre_msg_buf(msg, 0, 0);
154         if (!rec) 
155                 RETURN(-EINVAL);
156         
157         lid = rec->cr_replayid;
158
159         /* converting local inode store cookie to remote lustre_id. */
160         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->cr_id,
161                           &rec->cr_id, sizeof(rec->cr_id));
162         if (rc) {
163                 CERROR("Can't read master MDS store cookie "
164                        "from local inode EA, err = %d.\n", rc);
165                 RETURN(rc);
166         }
167
168         /* getting name to be created and its length */
169         name = lustre_msg_string(msg, 1, 0);
170         namelen = name ? msg->buflens[1] - 1 : 0;
171   
172         /* getting misc data (symlink) and its length */
173         data = (char *)lustre_msg_buf(msg, 2, 0);
174         datalen = data ? msg->buflens[2] : 0;       
175
176         OBD_ALLOC(op_data, sizeof(*op_data));
177         if (op_data == NULL) 
178                 GOTO(exit, rc = -ENOMEM);
179  
180         /* zeroing @rec->cr_replayid out in request, as master MDS should create
181          * own inode (with own store cookie). */
182         memset(&rec->cr_replayid, 0, sizeof(rec->cr_replayid));
183        
184         /* prepare mdc request data. */
185         cmobd_prepare_mdc_data(op_data, &rec->cr_id, &rec->cr_replayid,
186                                name, namelen, rec->cr_mode);
187
188         /* requesting to master to create object with passed attributes. */
189         rc = md_create(cmobd->master_exp, op_data, data, datalen,
190                        rec->cr_mode, current->fsuid, current->fsgid,
191                        rec->cr_rdev, &req);
192         OBD_FREE(op_data, sizeof(*op_data));
193
194         if (!rc) {
195                 /* here we save store cookie from master MDS to local 
196                  * inode EA. */
197                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
198
199                 rc = mds_update_mid(cmobd->cache_exp->exp_obd, &lid,
200                                     &body->id1, sizeof(body->id1));
201         }
202 exit:
203         if (req)
204                 ptlrpc_req_finished(req);
205         
206         if (alloc == 1)
207                 OBD_FREE(data, datalen);
208         
209         RETURN(rc);
210 }
211
212 static int cmobd_reint_unlink(struct obd_device *obd, void *record)
213 {
214         struct cm_obd *cmobd = &obd->u.cm;
215         struct ptlrpc_request *req = NULL;
216         struct mds_kml_pack_info *mkpi;
217         struct mdc_op_data *op_data;
218         struct mds_rec_unlink *rec;
219         struct lustre_msg *msg;
220         int rc = 0, namelen;
221         char *name = NULL;
222         ENTRY;
223         
224         mkpi = (struct mds_kml_pack_info *)record;
225         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
226
227         rec = lustre_msg_buf(msg, 0, 0);
228         if (!rec) 
229                 RETURN(-EINVAL);
230
231         /* converting local store cookie to remote lustre_id. */
232         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->ul_id1,
233                           &rec->ul_id1, sizeof(rec->ul_id1));
234         if (rc) {
235                 CERROR("Can't read master MDS store cookie "
236                        "from local inode EA, err = %d.\n", rc);
237                 RETURN(rc);
238         }
239
240         /* getting name to be created and its length */
241         name = lustre_msg_string(msg, 1, 0);
242         namelen = name ? msg->buflens[1] - 1 : 0;
243
244         OBD_ALLOC(op_data, sizeof(*op_data));
245         if (op_data == NULL)
246                 RETURN(-ENOMEM);
247
248         /* prepare mdc request data. */
249         cmobd_prepare_mdc_data(op_data, &rec->ul_id1, NULL,
250                                name, namelen, rec->ul_mode);
251
252         rc = md_unlink(cmobd->master_exp, op_data, &req);
253         OBD_FREE(op_data, sizeof(*op_data));
254
255         if (req)
256                 ptlrpc_req_finished(req);
257         RETURN(rc);
258 }
259
260 static int cmobd_reint_link(struct obd_device *obd, void *record)
261 {
262         struct cm_obd *cmobd = &obd->u.cm;
263         struct ptlrpc_request *req = NULL;
264         struct mds_kml_pack_info *mkpi;
265         struct mdc_op_data *op_data;
266         struct mds_rec_link *rec;
267         struct lustre_msg *msg;
268         int rc = 0, namelen;
269         char *name;
270         ENTRY;
271
272         mkpi = (struct mds_kml_pack_info *)record;
273         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
274         
275         rec = lustre_msg_buf(msg, 0, 0);
276         if (!rec) 
277                 RETURN(-EINVAL);
278
279         /* converting local store cookie for both ids to remote lustre_id. */
280         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->lk_id1,
281                           &rec->lk_id1, sizeof(rec->lk_id1));
282         if (rc) {
283                 CERROR("Can't read master MDS store cookie "
284                        "from local inode EA, err = %d.\n", rc);
285                 RETURN(rc);
286         }
287         
288         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->lk_id2,
289                           &rec->lk_id2, sizeof(rec->lk_id2));
290         if (rc) {
291                 CERROR("Can't read master MDS store cookie "
292                        "from local inode EA, err = %d.\n", rc);
293                 RETURN(rc);
294         }
295         
296         /* getting name to be created and its length */
297         name = lustre_msg_string(msg, 1, 0);
298         namelen = name ? msg->buflens[1] - 1: 0;
299
300         OBD_ALLOC(op_data, sizeof(*op_data));
301         if (op_data == NULL)
302                 RETURN(-ENOMEM);
303
304         /* prepare mdc request data. */
305         cmobd_prepare_mdc_data(op_data, &rec->lk_id1, &rec->lk_id2,
306                                name, namelen, 0);
307
308         rc = md_link(cmobd->master_exp, op_data, &req);
309         OBD_FREE(op_data, sizeof(*op_data));
310
311         if (req)
312                 ptlrpc_req_finished(req);
313         RETURN(rc);
314 }
315
316 static int cmobd_reint_rename(struct obd_device *obd, void *record)
317 {
318         struct cm_obd *cmobd = &obd->u.cm;
319         struct ptlrpc_request *req = NULL;
320         struct mds_kml_pack_info *mkpi;
321         struct mdc_op_data *op_data;
322         struct mds_rec_rename *rec;
323         int rc = 0, oldlen, newlen;
324         struct lustre_msg *msg;
325         char *old, *new;
326         ENTRY;
327
328         mkpi = (struct mds_kml_pack_info *)record;
329         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
330         
331         rec = lustre_msg_buf(msg, 0, 0);
332         if (!rec) 
333                 RETURN(-EINVAL);
334         
335         /* converting local store cookie for both ids to remote lustre_id. */
336         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->rn_id1,
337                           &rec->rn_id1, sizeof(rec->rn_id1));
338         if (rc) {
339                 CERROR("Can't read master MDS store cookie "
340                        "from local inode EA, err = %d.\n", rc);
341                 RETURN(rc);
342         }
343         
344         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->rn_id2,
345                           &rec->rn_id2, sizeof(rec->rn_id2));
346         if (rc) {
347                 CERROR("Can't read master MDS store cookie "
348                        "from local inode EA, err = %d.\n", rc);
349                 RETURN(rc);
350         }
351
352         /* getting old name and its length */
353         old = lustre_msg_string(msg, 1, 0);
354         oldlen = old ? msg->buflens[1] - 1 : 0;
355
356         /* getting new len and its length */
357         new = lustre_msg_string(msg, 2, 0);
358         newlen = new ? msg->buflens[2] - 1: 0;
359         
360         OBD_ALLOC(op_data, sizeof(*op_data));
361         if (op_data == NULL)
362                 RETURN(-ENOMEM);
363         
364         /* prepare mdc request data. */
365         cmobd_prepare_mdc_data(op_data, &rec->rn_id1, &rec->rn_id1,
366                                NULL, 0, 0);
367
368         rc = md_rename(cmobd->master_exp, op_data, old, oldlen,
369                        new, newlen, &req);
370         OBD_FREE(op_data, sizeof(*op_data));
371
372         if (req)
373                 ptlrpc_req_finished(req);
374         RETURN(rc);
375 }
376
377 typedef int (*cmobd_reint_rec_func_t)(struct obd_device *, void *);
378
379 static cmobd_reint_rec_func_t mds_reint_handler[REINT_MAX + 1] = {
380         [REINT_SETATTR] cmobd_reint_setattr,
381         [REINT_CREATE] cmobd_reint_create,
382         [REINT_LINK] cmobd_reint_link,
383         [REINT_UNLINK] cmobd_reint_unlink,
384         [REINT_RENAME] cmobd_reint_rename,
385 };
386
387 int cmobd_reint_mds(struct obd_device *obd, void *record, int dummy)
388 {
389         struct mds_kml_pack_info *mkpi;
390         struct lustre_msg *msg;
391         __u32 opcode;
392         
393         mkpi = (struct mds_kml_pack_info *)record;
394         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
395         
396         opcode = *(__u32 *)lustre_msg_buf(msg, 0, 0);
397         
398         if (opcode > REINT_MAX || opcode <= 0) {
399                 CERROR("Invalid mds reint opcode %u\n",
400                        opcode);
401                 return -EINVAL;
402         }
403         
404         return mds_reint_handler[opcode](obd, record);
405