Whamcloud - gitweb
- don't call obd_set_info() on the NULL mdc export on rename.
[fs/lustre-release.git] / lustre / mds / mds_audit_path.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *
5  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #ifndef EXPORT_SYMTAB
24 # define EXPORT_SYMTAB
25 #endif
26 #define DEBUG_SUBSYSTEM S_MDS
27
28 #include <linux/module.h>
29 #include <linux/lustre_mds.h>
30 #include <linux/lustre_dlm.h>
31 #include <linux/lustre_fsfilt.h>
32 #include <linux/init.h>
33 #include <linux/obd_class.h>
34 #include <linux/fs.h>
35 #include <linux/namei.h>
36 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
37 # include <linux/smp_lock.h>
38 # include <linux/buffer_head.h>
39 # include <linux/workqueue.h>
40 # include <linux/mount.h>
41 #else
42 # include <linux/locks.h>
43 #endif
44 #include <linux/lustre_audit.h>
45 #include "mds_internal.h"
46
47
48 #define PP_FILE         1
49 #define PP_DIR          2
50 #define PP_SPLIT_MASTER 3
51 #define PP_SPLIT_SLAVE  4
52 #define PP_CROSS_DIR    5
53 #define PP_AUDIT_LOG    6       /* search id in audit log */
54
55 struct scan_dir_data {
56         int               rc;
57         struct lustre_id *id;
58         char             *name;
59 };
60
61 static int filldir(void *__buf, const char *name, int namlen,
62                    loff_t offset, ino_t ino, unsigned int d_type)
63 {
64         struct scan_dir_data *sd = __buf;
65         ENTRY;
66
67         if (name[0] == '.' &&
68             (namlen == 1 || (namlen == 2 && name[1] == '.'))) {
69                 /* skip special entries */
70                 RETURN(0);
71         }
72
73         LASSERT(sd != NULL);
74
75         if (ino == id_ino(sd->id)) {
76                 strncpy(sd->name, name, namlen);
77                 sd->rc = 0;
78                 RETURN(-EINTR); /* break the readdir loop */
79         }
80         RETURN(0);
81 }
82
83 int 
84 scan_name_in_parent(struct lustre_id *pid, struct lustre_id *id, char *name)
85 {
86         struct file * file;
87         char *pname;
88         struct scan_dir_data sd;
89         int len, rc = 0;
90         ENTRY;
91
92         len = strlen("__iopen__/") + 10 + 1;
93         OBD_ALLOC(pname, len);
94         if (!pname)
95                 RETURN(-ENOMEM);
96         
97         sprintf(pname, "__iopen__/0x%llx", id_ino(pid));
98
99         file = filp_open(pname, O_RDONLY, 0);
100         if (IS_ERR(file)) {
101                 CERROR("can't open directory %s: %d\n",
102                        pname, (int) PTR_ERR(file));
103                 GOTO(out, rc = PTR_ERR(file));
104         }
105         
106         sd.id = id;
107         sd.name = name;
108         sd.rc = -ENOENT;
109         vfs_readdir(file, filldir, &sd);
110
111         filp_close(file, 0);
112         rc = sd.rc;
113
114 out:
115         OBD_FREE(pname, len);
116         RETURN(rc);
117
118 }
119
120 /* id2pid - given id, get parent id or master id.
121  * @obd:   obd device
122  * @id:    child id to be parsed
123  * @pid:   parent id or master id
124  * @type:  id type
125  */
126 static int 
127 id2pid(struct obd_device *obd, struct lustre_id *id, struct lustre_id *pid, 
128        __u32 *type)
129 {
130         struct dentry *dentry = NULL;
131         struct inode *inode = NULL;
132         struct mea *mea = NULL;
133         int mea_size, rc = 0;
134         ENTRY;
135         
136         dentry = mds_id2dentry(obd, id, NULL);
137         if (IS_ERR(dentry) || !dentry->d_inode) {
138                 CERROR("can't find inode "LPU64"\n", id_ino(id));
139                 if (!IS_ERR(dentry)) l_dput(dentry);
140                 RETURN(-ENOENT);
141         }
142         inode = dentry->d_inode;
143
144         if (S_ISDIR(id_type(id))) {
145                 LASSERT(S_ISDIR(inode->i_mode));
146                 rc = mds_md_get_attr(obd, inode, &mea, &mea_size);
147                 if (rc)
148                         GOTO(out, rc);
149                 
150                 if (!mea) {
151                         *type = PP_DIR;
152                         goto read_pid;
153                 } else if (mea && mea->mea_count) {
154                         *type = PP_SPLIT_MASTER;
155                         goto read_pid;
156                 } else {
157                         *type = PP_SPLIT_SLAVE;
158                         *pid = mea->mea_ids[mea->mea_master];
159                 }
160                                 
161         } else {
162                 LASSERT(!S_ISDIR(inode->i_mode));
163                 *type = PP_FILE;
164 read_pid:
165                 rc = mds_read_inode_pid(obd, inode, pid);
166                 if (rc) {
167                         CERROR("can't read parent ino(%lu) rc(%d).\n",
168                                inode->i_ino, rc);
169                         GOTO(out, rc);
170                 }
171         }
172
173         /* Well, if it's dir or master split, we have to check if it's 
174          * a cross-ref dir */
175         if ((*type == PP_DIR || *type == PP_SPLIT_MASTER) &&
176              id_group(id) != id_group(pid))
177                 *type = PP_CROSS_DIR;
178 out:
179         if (mea)
180                 OBD_FREE(mea, mea_size);
181         l_dput(dentry);
182         RETURN(rc);
183 }
184
185 static int local_parse_id(struct obd_device *obd, struct parseid_pkg *pkg)
186 {
187         struct lvfs_run_ctxt saved;
188         int rc = 0;
189         ENTRY;
190
191         pkg->pp_rc = 0;
192         pkg->pp_type = 0;
193         memset(pkg->pp_name, 0, sizeof(pkg->pp_name));
194
195         LASSERT(obd->u.mds.mds_num == id_group(&pkg->pp_id1));
196
197         /* pp_id2 is present, which indicating we want to scan parent 
198          * dir(pp_id2) to find the cross-ref entry(pp_id1) */
199         if (id_fid(&pkg->pp_id2)) {
200                 LASSERT(S_ISDIR(id_type(&pkg->pp_id1)));
201                 LASSERT(S_ISDIR(id_type(&pkg->pp_id2)));
202
203                 pkg->pp_type = PP_DIR;
204                 goto scan;
205         }
206
207         rc = id2pid(obd, &pkg->pp_id1, &pkg->pp_id2, &pkg->pp_type);
208         if (rc)
209                 GOTO(out, rc);
210 scan:
211         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
212         
213         switch (pkg->pp_type) {
214         case PP_FILE:
215         case PP_DIR:
216         case PP_SPLIT_MASTER:
217                 rc = scan_name_in_parent(&pkg->pp_id2, &pkg->pp_id1,
218                                          pkg->pp_name);
219                 if (rc) 
220                         CERROR("scan "LPU64" in parent failed. rc=%d\n",
221                                id_ino(&pkg->pp_id1), rc);
222                 break;
223         case PP_SPLIT_SLAVE:
224         case PP_CROSS_DIR:
225                 break;
226         default:
227                 CERROR("invalid id\n");
228                 break;
229         }
230
231         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
232 out:
233         pkg->pp_rc = rc;
234         RETURN(rc);
235 }
236
237 static int 
238 local_scan_audit_log(struct obd_device *obd, struct parseid_pkg *pkg);
239
240 int mds_parse_id(struct ptlrpc_request *req)
241 {
242         struct parseid_pkg *pkg, *reppkg;
243         struct obd_device *obd = req->rq_export->exp_obd;
244         int rc = 0, size = sizeof(*reppkg);
245         ENTRY;
246
247         pkg = lustre_swab_reqbuf(req, 0, sizeof(*pkg), 
248                                  lustre_swab_parseid_pkg);
249         if (pkg == NULL)
250                 RETURN(-EPROTO);
251
252         rc = lustre_pack_reply(req, 1, &size, NULL);
253         if (rc)
254                 RETURN(rc);
255         
256         reppkg = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reppkg));
257         memcpy(reppkg, pkg, sizeof(*reppkg));
258
259         if (reppkg->pp_type == PP_AUDIT_LOG)
260                 rc = local_scan_audit_log(obd, reppkg);
261         else
262                 rc = local_parse_id(obd, reppkg);
263         
264         if (rc)
265                 CERROR("local parseid failed. (rc:%d)\n", rc);
266         RETURN(0);  /* we do need pack reply here */
267 }
268
269 static int parse_id(struct obd_device *obd, struct parseid_pkg *pkg)
270 {
271         int rc = 0;
272         int mds_num = id_group(&pkg->pp_id1);
273         ENTRY;
274         
275         LASSERT(mds_num >= 0);
276
277         if (mds_num == obd->u.mds.mds_num) {
278                 rc = local_parse_id(obd, pkg);
279         } else {
280                 struct ptlrpc_request *req;
281                 struct lmv_obd *lmv = &obd->u.mds.mds_md_obd->u.lmv;
282                 struct parseid_pkg *body;
283                 int size = sizeof(*body);
284                 struct obd_export *exp;
285                 
286                 /* make sure connection established */
287                 rc = obd_set_info(obd->u.mds.mds_md_exp, strlen("chkconnect"),
288                                   "chkconnect", 0, NULL);
289                 if (rc)
290                         RETURN(rc);
291
292                 exp = lmv->tgts[mds_num].ltd_exp;
293                 LASSERT(exp);
294
295                 req = ptlrpc_prep_req(class_exp2cliimp(exp), 
296                                       LUSTRE_MDS_VERSION, MDS_PARSE_ID, 1, 
297                                       &size, NULL);
298                 if (!req)
299                         RETURN(-ENOMEM);
300
301                 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
302                 memcpy(body, pkg, sizeof(*body));
303                 
304                 req->rq_replen = lustre_msg_size(1, &size);
305
306                 rc = ptlrpc_queue_wait(req);
307                 if (rc)
308                         GOTO(out, rc);
309
310                 body = lustre_swab_repbuf(req, 0, sizeof(*body), 
311                                           lustre_swab_parseid_pkg);
312                 if (body == NULL) {
313                         CERROR("can't unpack parseid_pkg\n");
314                         GOTO(out, rc = -EPROTO);
315                 }
316                 memcpy(pkg, body, sizeof(*pkg));
317 out:
318                 ptlrpc_req_finished(req);
319         }
320         RETURN(rc);
321 }
322
323 #define ROOT_FID        2
324 struct name_item {
325         struct list_head link;
326         char             name[NAME_MAX + 1];
327 };
328
329 int 
330 mds_id2name(struct obd_device *obd, struct lustre_id *id, 
331             struct list_head *list, struct lustre_id *lastid)
332 {
333         struct name_item *item;
334         struct parseid_pkg *pkg;
335         int rc = 0;
336         ENTRY;
337
338         OBD_ALLOC(pkg, sizeof(*pkg));
339         if (pkg == NULL)
340                 RETURN(-ENOMEM);
341
342         pkg->pp_id1 = *id;
343         while (id_fid(&pkg->pp_id1) != ROOT_FID) {
344                 
345                 rc = parse_id(obd, pkg);
346                 if (rc) {
347                         CDEBUG(D_SEC, "parse id failed. rc=%d\n", rc);
348                         *lastid = pkg->pp_id1;
349                         break;
350                 }
351
352                 switch (pkg->pp_type) {
353                 case PP_FILE:
354                 case PP_DIR:
355                 case PP_SPLIT_MASTER:
356                         OBD_ALLOC(item, sizeof(*item));
357                         if (item == NULL)
358                                 GOTO(out, rc = -ENOMEM);
359                         
360                         INIT_LIST_HEAD(&item->link);
361                         list_add(&item->link, list);
362                         memcpy(item->name, pkg->pp_name, sizeof(item->name));
363                         
364                 case PP_SPLIT_SLAVE:
365                         pkg->pp_id1 = pkg->pp_id2;
366                         memset(&pkg->pp_id2, 0, sizeof(struct lustre_id));
367                         break;
368                 case PP_CROSS_DIR:
369                         break;
370                 default:
371                         LBUG();
372                         break;
373                 }
374                 
375         }
376 out:
377         OBD_FREE(pkg, sizeof(*pkg));
378         RETURN(rc);
379 }
380
381 static int
382 scan_audit_log_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, void *data)
383 {
384         struct parseid_pkg *pkg = (struct parseid_pkg *)data;
385         struct audit_record *ad_rec; 
386         struct audit_id_record *cid_rec, *pid_rec;
387         struct audit_name_record *nm_rec;
388         ENTRY;
389
390         if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
391                 CERROR("log is not plain\n");
392                 RETURN(-EINVAL);
393         }
394         if (rec->lrh_type != SMFS_AUDIT_NAME_REC &&
395             rec->lrh_type != LLOG_GEN_REC) {
396                 RETURN(0);
397         }
398
399         ad_rec = (struct audit_record *)((char *)rec + sizeof(*rec));
400
401         if (ad_rec->result || 
402             ad_rec->opcode != AUDIT_UNLINK ||
403             ad_rec->opcode != AUDIT_RENAME)
404                 RETURN(0);
405
406         cid_rec = (struct audit_id_record *)((char *)ad_rec + sizeof(*ad_rec));
407         pid_rec = cid_rec + 1;
408         nm_rec = (struct audit_name_record *)
409                 ((char *)pid_rec + sizeof(*pid_rec));
410         
411         if (cid_rec->au_num == id_ino(&pkg->pp_id1) &&
412             cid_rec->au_gen == id_gen(&pkg->pp_id1)) {
413                 /* get parent id */
414                 id_ino(&pkg->pp_id2) = pid_rec->au_num;
415                 id_gen(&pkg->pp_id2) = pid_rec->au_gen;
416                 id_type(&pkg->pp_id2) = pid_rec->au_type;
417                 id_fid(&pkg->pp_id2) = pid_rec->au_fid;
418                 id_group(&pkg->pp_id2) = pid_rec->au_mds;
419                 /* get name */
420                 memcpy(pkg->pp_name, nm_rec->name, 
421                        le32_to_cpu(nm_rec->name_len));
422
423                 RETURN(LLOG_PROC_BREAK);
424         }
425         RETURN(0);
426 }
427
428 static int
429 local_scan_audit_log(struct obd_device *obd, struct parseid_pkg *pkg)
430 {
431         struct llog_handle *llh = NULL;
432         struct llog_ctxt *ctxt = llog_get_context(&obd->obd_llogs,
433                                                   LLOG_AUDIT_ORIG_CTXT);
434         int rc = 0;
435         ENTRY;
436
437         if (ctxt)
438                 llh = ctxt->loc_handle;
439
440         if (llh == NULL)
441                 RETURN(-ENOENT);
442
443         rc = llog_cat_process(llh, (llog_cb_t)&scan_audit_log_cb, (void *)pkg);
444         if (rc != LLOG_PROC_BREAK) {
445                 CWARN("process catalog log failed: rc(%d)\n", rc);
446                 RETURN(-ENOENT);
447         }
448         RETURN(0);
449 }
450
451 static int 
452 scan_audit_log(struct obd_device *obd, struct lustre_id *cur_id, 
453                struct list_head *list, struct lustre_id *parent_id)
454 {
455         struct name_item *item = NULL;
456         int rc = 0, mds_num = id_group(cur_id);
457         struct parseid_pkg *pkg = NULL;
458         ENTRY;
459
460         OBD_ALLOC(pkg, sizeof(*pkg));
461         if (pkg == NULL)
462                 RETURN(-ENOMEM);
463
464         pkg->pp_type = PP_AUDIT_LOG;
465         pkg->pp_id1 = *cur_id;
466
467         if (obd->u.mds.mds_num == mds_num) {
468                 rc = local_scan_audit_log(obd, pkg);
469         } else {
470                 struct ptlrpc_request *req;
471                 struct lmv_obd *lmv = &obd->u.mds.mds_md_obd->u.lmv;
472                 struct parseid_pkg *body;
473                 int size = sizeof(*body);
474                 struct obd_export *exp;
475                 
476                 /* make sure connection established */
477                 rc = obd_set_info(obd->u.mds.mds_md_exp, strlen("chkconnect"),
478                                   "chkconnect", 0, NULL);
479                 if (rc)
480                         RETURN(rc);
481
482                 exp = lmv->tgts[mds_num].ltd_exp;
483                 LASSERT(exp);
484
485                 req = ptlrpc_prep_req(class_exp2cliimp(exp), 
486                                       LUSTRE_MDS_VERSION, MDS_PARSE_ID, 1, 
487                                       &size, NULL);
488                 if (!req)
489                         RETURN(-ENOMEM);
490
491                 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
492                 memcpy(body, pkg, sizeof(*body));
493                 
494                 req->rq_replen = lustre_msg_size(1, &size);
495
496                 rc = ptlrpc_queue_wait(req);
497                 if (rc)
498                         GOTO(out_req, rc);
499
500                 body = lustre_swab_repbuf(req, 0, sizeof(*body), 
501                                           lustre_swab_parseid_pkg);
502                 if (body == NULL) {
503                         CERROR("can't unpack parseid_pkg\n");
504                         GOTO(out, rc = -EPROTO);
505                 }
506                 memcpy(pkg, body, sizeof(*pkg));
507 out_req:
508                 ptlrpc_req_finished(req);
509
510         }
511
512         if (!rc) rc = pkg->pp_rc;
513         if (rc)
514                 GOTO(out, rc);
515         
516         *parent_id = pkg->pp_id2;
517
518         OBD_ALLOC(item, sizeof(*item));
519         if (item == NULL)
520                 GOTO(out, rc = -ENOMEM);
521         
522         INIT_LIST_HEAD(&item->link);
523         list_add(&item->link, list);
524         memcpy(item->name, pkg->pp_name, sizeof(item->name));
525 out:
526         OBD_FREE(pkg, sizeof(*pkg));
527         RETURN(rc);
528 }
529        
530 int 
531 mds_audit_id2name(struct obd_device *obd, char **name, int *namelen, 
532                   struct lustre_id *id)
533 {
534         int rc = 0;
535         struct list_head list, *pos, *n;
536         struct name_item *item;
537         struct lustre_id parent_id, cur_id;
538         ENTRY;
539
540         *namelen = 0;
541         INIT_LIST_HEAD(&list);
542
543         cur_id = *id;
544         if (id_fid(&cur_id) == ROOT_FID)
545                 RETURN(0);
546 next:
547         memset(&parent_id, 0, sizeof(parent_id));
548
549         rc = mds_id2name(obd, &cur_id, &list, &parent_id);
550         if (rc == -ENOENT) {
551                 /* can't reconstruct name from id, turn to audit log */
552                 LASSERT(id_fid(&parent_id));
553                 cur_id = parent_id;
554                 memset(&parent_id, 0, sizeof(parent_id));
555
556                 rc = scan_audit_log(obd, &cur_id, &list, &parent_id);
557                 if (rc) {
558                         CERROR("scan id in audit log failed. (rc:%d)\n", rc);
559                         GOTO(out, rc);
560                 }
561
562                 LASSERT(id_fid(&parent_id));
563                 cur_id = parent_id;
564                 goto next;
565
566         } else if (rc) {
567                 CERROR("reconstruct name from id failed. (rc:%d)\n", rc);
568                 GOTO(out, rc);
569         }
570         
571         list_for_each_safe (pos, n, &list) {
572                 item = list_entry(pos, struct name_item, link);
573                 *namelen += strlen(item->name) + 1;
574         }
575         OBD_ALLOC(*name, *namelen);
576         if (*name == NULL)
577                 rc = -ENOMEM;
578 out:
579         list_for_each_safe (pos, n, &list) {
580                 item = list_entry(pos, struct name_item, link);
581                 if (!rc) {
582                         strcat(*name, "/");
583                         strcat(*name, item->name);
584                 }
585                 list_del_init(&item->link);
586                 OBD_FREE(item, sizeof(*item));
587         }
588         RETURN(rc);
589 }
590 EXPORT_SYMBOL(mds_audit_id2name);