Whamcloud - gitweb
73b07d599339b7b3c30ff9bcee3e4dc89ba26fd5
[fs/lustre-release.git] / lustre / cmobd / cm_mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_CMOBD
23
24 #include <linux/config.h>
25 #include <linux/module.h>
26 #include <linux/kernel.h>
27 #include <linux/obd_class.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_mds.h>
30 #include <linux/lustre_smfs.h>
31
32 #include "cm_internal.h"
33
34 /* converts mds_rec_setattr to struct iattr. */
35 static inline void cmobd_rec2iattr(struct mds_rec_setattr *rec,
36                                    struct iattr *iattr)
37 {
38         iattr->ia_uid = rec->sa_uid;
39         iattr->ia_gid = rec->sa_gid;
40         iattr->ia_mode = rec->sa_mode;
41         iattr->ia_size = rec->sa_size;
42         iattr->ia_valid = rec->sa_valid;
43         LTIME_S(iattr->ia_atime) = rec->sa_atime;
44         LTIME_S(iattr->ia_mtime) = rec->sa_mtime;
45         LTIME_S(iattr->ia_ctime) = rec->sa_ctime;
46         iattr->ia_attr_flags = rec->sa_attr_flags;
47 }
48
49 static void
50 cmobd_prepare_mdc_data(struct mdc_op_data *data, struct lustre_id *id1,
51                        struct lustre_id *id2, const char *name,
52                        int namelen, __u32 mode)
53 {
54         LASSERT(id1);
55         LASSERT(data);
56
57         memset(data, 0, sizeof(*data));
58
59         data->id1 = *id1;
60         if (id2)
61                 data->id2 = *id2;
62         else
63                 memset(&data->id2, 0, sizeof(data->id2));
64
65         data->valid = 0;
66         data->name = name;
67         data->namelen = namelen;
68         data->create_mode = mode;
69         data->mod_time = LTIME_S(CURRENT_TIME);
70 }
71
72 /* If mdc_setattr() is called with an 'iattr', then it is a normal RPC that
73  * should take the normal semaphore and go to the normal portal.
74  *
75  * If it is called with iattr->ia_valid & ATTR_FROM_OPEN, then it is a magic
76  * open-path setattr that should take the setattr semaphore and go to the
77  * setattr portal. */
78 static int cmobd_reint_setattr(struct obd_device *obd, void *record)
79 {
80         struct cm_obd *cmobd = &obd->u.cm;
81         struct ptlrpc_request *req = NULL;
82         struct mds_kml_pack_info *mkpi;
83         struct mds_rec_setattr *rec;
84         struct mdc_op_data *op_data;
85         struct lustre_msg *msg;
86         int ea1len, ea2len;
87         struct iattr iattr;
88         void *ea1, *ea2;
89         int rc = 0;
90         ENTRY;
91
92         mkpi = (struct mds_kml_pack_info *)record;
93         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
94
95         rec = lustre_msg_buf(msg, 0, 0);
96         if (!rec) 
97                 RETURN(-EINVAL);
98
99         /* converting setattr rec to struct iattr. */
100         cmobd_rec2iattr(rec, &iattr);
101
102         /* FIXME-UMKA: here should be handling of setattr() from open. Bug
103          * #249. Will be fixed later. */
104
105         /* converting localstore cookie to remote lustre_id. */
106         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->sa_id,
107                           &rec->sa_id, sizeof(rec->sa_id));
108         if (rc) {
109                 CERROR("Can't read master MDS store cookie "
110                        "from local inode EA, err = %d.\n", rc);
111                 RETURN(rc);
112         }
113
114         OBD_ALLOC(op_data, sizeof(*op_data));
115         if (op_data == NULL)
116                 RETURN(-ENOMEM);
117         cmobd_prepare_mdc_data(op_data, &rec->sa_id, NULL,
118                                NULL, 0, 0);
119
120         /* handling possible EAs. */
121         ea1 = lustre_msg_buf(msg, 1, 0);
122         ea1len = ea1 ? msg->buflens[1] : 0;
123
124         ea2 = lustre_msg_buf(msg, 2, 0);
125         ea2len = ea2 ? msg->buflens[2] : 0;
126
127         rc = md_setattr(cmobd->master_exp, op_data, &iattr,
128                         ea1, ea1len, ea2, ea2len, &req);
129         OBD_FREE(op_data, sizeof(*op_data));
130
131         if (req)
132                 ptlrpc_req_finished(req);
133         RETURN(rc);
134 }
135  
136 static int cmobd_reint_create(struct obd_device *obd, void *record)
137 {
138         struct cm_obd *cmobd = &obd->u.cm;
139         struct ptlrpc_request *req = NULL;
140         struct mds_kml_pack_info *mkpi;
141         int rc = 0, namelen, datalen, alloc = 0;
142         struct mds_rec_create *rec;
143         struct mdc_op_data *op_data;
144         struct lustre_msg *msg;
145         struct mds_body *body;
146         struct lustre_id lid;
147         char *name, *data;
148         ENTRY;
149
150         mkpi = (struct mds_kml_pack_info *)record;
151         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
152
153         rec = lustre_msg_buf(msg, 0, 0);
154         if (!rec) 
155                 RETURN(-EINVAL);
156         
157         lid = rec->cr_replayid;
158
159         /* converting local inode store cookie to remote lustre_id. */
160         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->cr_id,
161                           &rec->cr_id, sizeof(rec->cr_id));
162         if (rc) {
163                 CERROR("Can't read master MDS store cookie "
164                        "from local inode EA, err = %d.\n", rc);
165                 RETURN(rc);
166         }
167
168         /* getting name to be created and its length */
169         name = lustre_msg_string(msg, 1, 0);
170         namelen = name ? msg->buflens[1] - 1 : 0;
171   
172         /* getting misc data (symlink) and its length */
173         data = (char *)lustre_msg_buf(msg, 2, 0);
174         datalen = data ? msg->buflens[2] : 0;       
175
176         if (datalen == 0 && S_ISREG(rec->cr_mode)) {
177                 /*Get lov md from inode*/
178                 mds_read_md(cmobd->cache_exp->exp_obd, &rec->cr_replayid, 
179                             &data, &datalen);
180                 if (datalen > 0)
181                        alloc = 1; 
182         } 
183         OBD_ALLOC(op_data, sizeof(*op_data));
184         if (op_data == NULL) 
185                 GOTO(exit, rc = -ENOMEM);
186  
187         /* zeroing @rec->cr_replayid out in request, as master MDS should create
188          * own inode (with own store cookie). */
189         memset(&rec->cr_replayid, 0, sizeof(rec->cr_replayid));
190        
191         /* prepare mdc request data. */
192         cmobd_prepare_mdc_data(op_data, &rec->cr_id, &rec->cr_replayid,
193                                name, namelen, rec->cr_mode);
194
195         /* requesting to master to create object with passed attributes. */
196         rc = md_create(cmobd->master_exp, op_data, data, datalen,
197                        rec->cr_mode, current->fsuid, current->fsgid,
198                        rec->cr_rdev, &req);
199         OBD_FREE(op_data, sizeof(*op_data));
200
201         if (!rc) {
202                 /* here we save store cookie from master MDS to local 
203                  * inode EA. */
204                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
205
206                 rc = mds_update_mid(cmobd->cache_exp->exp_obd, &lid,
207                                     &body->id1, sizeof(body->id1));
208         }
209 exit:
210         if (req)
211                 ptlrpc_req_finished(req);
212         
213         if (alloc == 1)
214                 OBD_FREE(data, datalen);
215         
216         RETURN(rc);
217 }
218
219 static int cmobd_reint_unlink(struct obd_device *obd, void *record)
220 {
221         struct cm_obd *cmobd = &obd->u.cm;
222         struct ptlrpc_request *req = NULL;
223         struct mds_kml_pack_info *mkpi;
224         struct mdc_op_data *op_data;
225         struct mds_rec_unlink *rec;
226         struct lustre_msg *msg;
227         int rc = 0, namelen;
228         char *name = NULL;
229         ENTRY;
230         
231         mkpi = (struct mds_kml_pack_info *)record;
232         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
233
234         rec = lustre_msg_buf(msg, 0, 0);
235         if (!rec) 
236                 RETURN(-EINVAL);
237
238         /* converting local store cookie to remote lustre_id. */
239         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->ul_id1,
240                           &rec->ul_id1, sizeof(rec->ul_id1));
241         if (rc) {
242                 CERROR("Can't read master MDS store cookie "
243                        "from local inode EA, err = %d.\n", rc);
244                 RETURN(rc);
245         }
246
247         /* getting name to be created and its length */
248         name = lustre_msg_string(msg, 1, 0);
249         namelen = name ? msg->buflens[1] - 1 : 0;
250
251         OBD_ALLOC(op_data, sizeof(*op_data));
252         if (op_data == NULL)
253                 RETURN(-ENOMEM);
254
255         /* prepare mdc request data. */
256         cmobd_prepare_mdc_data(op_data, &rec->ul_id1, NULL,
257                                name, namelen, rec->ul_mode);
258
259         rc = md_unlink(cmobd->master_exp, op_data, &req);
260         OBD_FREE(op_data, sizeof(*op_data));
261
262         if (req)
263                 ptlrpc_req_finished(req);
264         RETURN(rc);
265 }
266
267 static int cmobd_reint_link(struct obd_device *obd, void *record)
268 {
269         struct cm_obd *cmobd = &obd->u.cm;
270         struct ptlrpc_request *req = NULL;
271         struct mds_kml_pack_info *mkpi;
272         struct mdc_op_data *op_data;
273         struct mds_rec_link *rec;
274         struct lustre_msg *msg;
275         int rc = 0, namelen;
276         char *name;
277         ENTRY;
278
279         mkpi = (struct mds_kml_pack_info *)record;
280         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
281         
282         rec = lustre_msg_buf(msg, 0, 0);
283         if (!rec) 
284                 RETURN(-EINVAL);
285
286         /* converting local store cookie for both ids to remote lustre_id. */
287         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->lk_id1,
288                           &rec->lk_id1, sizeof(rec->lk_id1));
289         if (rc) {
290                 CERROR("Can't read master MDS store cookie "
291                        "from local inode EA, err = %d.\n", rc);
292                 RETURN(rc);
293         }
294         
295         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->lk_id2,
296                           &rec->lk_id2, sizeof(rec->lk_id2));
297         if (rc) {
298                 CERROR("Can't read master MDS store cookie "
299                        "from local inode EA, err = %d.\n", rc);
300                 RETURN(rc);
301         }
302         
303         /* getting name to be created and its length */
304         name = lustre_msg_string(msg, 1, 0);
305         namelen = name ? msg->buflens[1] - 1: 0;
306
307         OBD_ALLOC(op_data, sizeof(*op_data));
308         if (op_data == NULL)
309                 RETURN(-ENOMEM);
310
311         /* prepare mdc request data. */
312         cmobd_prepare_mdc_data(op_data, &rec->lk_id1, &rec->lk_id2,
313                                name, namelen, 0);
314
315         rc = md_link(cmobd->master_exp, op_data, &req);
316         OBD_FREE(op_data, sizeof(*op_data));
317
318         if (req)
319                 ptlrpc_req_finished(req);
320         RETURN(rc);
321 }
322
323 static int cmobd_reint_rename(struct obd_device *obd, void *record)
324 {
325         struct cm_obd *cmobd = &obd->u.cm;
326         struct ptlrpc_request *req = NULL;
327         struct mds_kml_pack_info *mkpi;
328         struct mdc_op_data *op_data;
329         struct mds_rec_rename *rec;
330         int rc = 0, oldlen, newlen;
331         struct lustre_msg *msg;
332         char *old, *new;
333         ENTRY;
334
335         mkpi = (struct mds_kml_pack_info *)record;
336         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
337         
338         rec = lustre_msg_buf(msg, 0, 0);
339         if (!rec) 
340                 RETURN(-EINVAL);
341         
342         /* converting local store cookie for both ids to remote lustre_id. */
343         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->rn_id1,
344                           &rec->rn_id1, sizeof(rec->rn_id1));
345         if (rc) {
346                 CERROR("Can't read master MDS store cookie "
347                        "from local inode EA, err = %d.\n", rc);
348                 RETURN(rc);
349         }
350         
351         rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->rn_id2,
352                           &rec->rn_id2, sizeof(rec->rn_id2));
353         if (rc) {
354                 CERROR("Can't read master MDS store cookie "
355                        "from local inode EA, err = %d.\n", rc);
356                 RETURN(rc);
357         }
358
359         /* getting old name and its length */
360         old = lustre_msg_string(msg, 1, 0);
361         oldlen = old ? msg->buflens[1] - 1 : 0;
362
363         /* getting new len and its length */
364         new = lustre_msg_string(msg, 2, 0);
365         newlen = new ? msg->buflens[2] - 1: 0;
366         
367         OBD_ALLOC(op_data, sizeof(*op_data));
368         if (op_data == NULL)
369                 RETURN(-ENOMEM);
370         
371         /* prepare mdc request data. */
372         cmobd_prepare_mdc_data(op_data, &rec->rn_id1, &rec->rn_id1,
373                                NULL, 0, 0);
374
375         rc = md_rename(cmobd->master_exp, op_data, old, oldlen,
376                        new, newlen, &req);
377         OBD_FREE(op_data, sizeof(*op_data));
378
379         if (req)
380                 ptlrpc_req_finished(req);
381         RETURN(rc);
382 }
383
384 typedef int (*cmobd_reint_rec_func_t)(struct obd_device *, void *);
385
386 static cmobd_reint_rec_func_t mds_reint_handler[REINT_MAX + 1] = {
387         [REINT_SETATTR] cmobd_reint_setattr,
388         [REINT_CREATE] cmobd_reint_create,
389         [REINT_LINK] cmobd_reint_link,
390         [REINT_UNLINK] cmobd_reint_unlink,
391         [REINT_RENAME] cmobd_reint_rename,
392 };
393
394 int cmobd_reint_mds(struct obd_device *obd, void *record, int dummy)
395 {
396         struct mds_kml_pack_info *mkpi;
397         struct lustre_msg *msg;
398         __u32 opcode;
399         
400         mkpi = (struct mds_kml_pack_info *)record;
401         msg = (struct lustre_msg *)(record + sizeof(*mkpi));
402         
403         opcode = *(__u32 *)lustre_msg_buf(msg, 0, 0);
404         
405         if (opcode > REINT_MAX || opcode <= 0) {
406                 CERROR("Invalid mds reint opcode %u\n",
407                        opcode);
408                 return -EINVAL;
409         }
410         
411         return mds_reint_handler[opcode](obd, record);
412