Whamcloud - gitweb
port llog fixes from b1_6 into HEAD
[fs/lustre-release.git] / lustre / mds / mds_join.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_join.c
5  *  Lustre Metadata join handler file
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDS
29
30 #include <linux/fs.h>
31 #include <linux/jbd.h>
32 #include <linux/ext3_fs.h>
33 #include <obd_support.h>
34 #include <obd_class.h>
35 #include <obd.h>
36 #include <lustre_lib.h>
37 #include <lustre/lustre_idl.h>
38 #include <lustre_mds.h>
39 #include <lustre_dlm.h>
40 #include <lustre_log.h>
41 #include <lustre_fsfilt.h>
42 #include <lustre_lite.h>
43 #include <obd_lov.h>
44 #include "mds_internal.h"
45
46 struct mdsea_cb_data {
47     struct llog_handle     *mc_llh;
48     struct lov_mds_md      *mc_lmm;
49     struct lov_mds_md_join *mc_lmm_join;
50     __u64                   mc_offset;
51     __u64                   mc_headfile_sz;
52 };
53
54 static int mdsea_iterate(struct llog_handle *llh_tail, llog_cb_t cb,
55                          void *cbdata)
56 {
57     return llog_process(llh_tail, cb, cbdata, NULL);
58 }
59
60 static int mds_insert_join_lmm(struct llog_handle *llh,
61                                struct lov_mds_md *lmm,
62                                __u64 start, __u64 len,
63                                struct lov_mds_md_join *lmmj)
64 {
65         struct llog_rec_hdr rec;
66         struct mds_extent_desc *med;
67         int sz_med, rc;
68         ENTRY;
69
70
71         sz_med = lov_mds_md_size(le32_to_cpu(lmm->lmm_stripe_count));
72         sz_med += 2 * sizeof(__u64);
73         sz_med = size_round(sz_med);
74
75         rec.lrh_len  = cpu_to_le32(sz_med);
76         rec.lrh_type = cpu_to_le32(LLOG_JOIN_REC);
77
78         CDEBUG(D_INFO, "insert extent "LPU64":"LPU64" lmm \n", start, len);
79
80         OBD_ALLOC(med, sz_med);
81         if (med == NULL)
82                 RETURN(-ENOMEM);
83
84         med->med_start = start;
85         med->med_len = len;
86         memcpy(&med->med_lmm, lmm,
87                 lov_mds_md_size(le32_to_cpu(lmm->lmm_stripe_count)));
88
89         rc = llog_write_rec(llh, &rec, NULL, 0, med, -1);
90         OBD_FREE(med, sz_med);
91
92         if (lmmj) {
93                 /*modify lmmj for join stripe info*/
94                 lmmj->lmmj_md.lmm_stripe_count += lmm->lmm_stripe_count;
95                 lmmj->lmmj_extent_count ++;
96         }
97
98         RETURN(rc);
99 }
100
101 static int mdsea_append_extent(struct llog_handle *llh_tail,
102                                struct llog_rec_hdr *rec_in_tail,
103                                struct mdsea_cb_data *cbdata)
104 {
105         struct mds_extent_desc *med =
106                         &((struct llog_array_rec *)rec_in_tail)->lmr_med;
107         int rc;
108         ENTRY;
109
110         CDEBUG(D_INODE, "insert lmm extent: "LPU64":"LPU64" \n",
111                med->med_start, med->med_len);
112         rc = mds_insert_join_lmm(cbdata->mc_llh, &med->med_lmm,
113                                  med->med_start + cbdata->mc_headfile_sz,
114                                  med->med_len, cbdata->mc_lmm_join);
115         if (rc) {
116                 CERROR("error %d insert the lmm \n", rc);
117                 RETURN(rc);
118         }
119         RETURN(LLOG_DEL_RECORD);
120 }
121
122 static void mds_init_stripe_join(struct lov_mds_md_join *lmmj,
123                                  struct lov_mds_md *lmm,
124                                  struct llog_logid  *logid)
125 {
126         lmmj->lmmj_md.lmm_magic = cpu_to_le32(LOV_MAGIC_JOIN);
127         lmmj->lmmj_md.lmm_object_id = lmm->lmm_object_id;
128         lmmj->lmmj_md.lmm_object_gr = lmm->lmm_object_gr;
129         lmmj->lmmj_md.lmm_pattern = lmm->lmm_pattern;
130         lmmj->lmmj_md.lmm_stripe_size = lmm->lmm_stripe_size;
131         lmmj->lmmj_md.lmm_stripe_count = 0;
132         lmmj->lmmj_extent_count = 0;
133         lmmj->lmmj_array_id = *logid;
134 }
135
136 static int mdsea_cancel_last_extent(struct llog_handle *llh_tail,
137                                     struct llog_rec_hdr *rec_in_tail,
138                                     struct mdsea_cb_data *cbdata)
139 {
140         struct mds_extent_desc *med =
141                         &((struct llog_array_rec *)rec_in_tail)->lmr_med;
142
143         CDEBUG(D_INODE, "extent: "LPU64":"LPU64" \n",  med->med_start,
144                med->med_len);
145
146         LASSERTF(cbdata->mc_offset == med->med_start,
147                  "A hole in the extent "LPU64"--"LPU64"\n",
148                  cbdata->mc_offset, med->med_start);
149
150         if (med->med_len != -1)
151                 cbdata->mc_offset = med->med_start + med->med_len;
152
153         if (med->med_start > cbdata->mc_headfile_sz || (med->med_len == -1)) {
154                 CDEBUG(D_INFO, "del rec offset"LPU64", head size "LPU64" \n",
155                        med->med_start, cbdata->mc_headfile_sz);
156                 if (!cbdata->mc_lmm) {
157                         int stripe = le32_to_cpu(med->med_lmm.lmm_stripe_count);
158                         OBD_ALLOC(cbdata->mc_lmm, lov_mds_md_size(stripe));
159                         if (!cbdata->mc_lmm)
160                                 RETURN(-ENOMEM);
161                         memcpy(cbdata->mc_lmm, &med->med_lmm,
162                                lov_mds_md_size(stripe));
163                 }
164                 RETURN(LLOG_DEL_RECORD);
165         }
166         RETURN(0);
167 }
168
169 static int  mds_adjust_last_extent(struct llog_handle *llh_head,
170                                    __u64 head_size)
171 {
172         struct mdsea_cb_data  *cbdata;
173         int    rc;
174         ENTRY;
175
176         OBD_ALLOC_PTR(cbdata);
177
178         if (!cbdata)
179                 RETURN(-ENOMEM);
180
181         cbdata->mc_headfile_sz = head_size;
182         /*Find the last extent and cancel the record in the lmm*/
183         rc = mdsea_iterate(llh_head, (llog_cb_t)mdsea_cancel_last_extent,
184                            cbdata);
185
186         if (rc) {
187                 CERROR("can not find the last extent rc=%d\n", rc);
188                 GOTO(exit, rc);
189         }
190
191         LASSERT(cbdata->mc_lmm);
192
193         CDEBUG(D_INODE, "insert lmm extent: "LPU64":"LPU64" \n",
194                cbdata->mc_offset, (head_size - cbdata->mc_offset));
195
196         rc = mds_insert_join_lmm(llh_head, cbdata->mc_lmm,
197                                  cbdata->mc_offset,
198                                  (head_size - cbdata->mc_offset),
199                                  NULL);
200         if (rc)
201                 CERROR("error insert the lmm rc %d \n", rc);
202 exit:
203         if (cbdata && cbdata->mc_lmm)
204                 OBD_FREE(cbdata->mc_lmm,
205                          lov_mds_md_size(cbdata->mc_lmm->lmm_stripe_count));
206         if (cbdata)
207                 OBD_FREE_PTR(cbdata);
208
209         RETURN(rc);
210 }
211
212 static void mds_finish_join(struct mds_obd *mds, struct ptlrpc_request *req,
213                            struct inode *inode, struct lov_mds_md_join *lmmj)
214 {
215         struct mds_body *body = (struct mds_body *)
216                                 lustre_msg_buf(req->rq_repmsg, 1, 0);
217         int max_cookiesize = lmmj->lmmj_md.lmm_stripe_count *
218                                 sizeof(struct llog_cookie);
219         int max_easize = sizeof(*lmmj);
220
221         CDEBUG(D_INFO, "change the max md size from %d to "LPSZ"\n",
222                mds->mds_max_mdsize, sizeof(*lmmj));
223
224         if (mds->mds_max_mdsize < max_easize ||
225             mds->mds_max_cookiesize < max_cookiesize) {
226                 body->max_mdsize = mds->mds_max_mdsize > max_easize ?
227                                    mds->mds_max_mdsize : max_easize;
228                 mds->mds_max_mdsize = body->max_mdsize;
229                 body->max_cookiesize = mds->mds_max_cookiesize > max_cookiesize?
230                                    mds->mds_max_cookiesize : max_cookiesize;
231                 mds->mds_max_cookiesize = body->max_cookiesize;
232                 body->valid |= OBD_MD_FLMODEASIZE;
233         }
234
235         if (body->valid & OBD_MD_FLMODEASIZE)
236                 CDEBUG(D_INODE, "updating max_mdsize/max_cookiesize: %d/%d\n",
237                        mds->mds_max_mdsize, mds->mds_max_cookiesize);
238
239         mds_pack_inode2fid(&body->fid1, inode);
240         mds_pack_inode2body(body, inode);
241 }
242
243 static int mds_join_unlink_tail_inode(struct mds_update_record *rec,
244                                       struct ptlrpc_request *req,
245                                       struct mds_rec_join *join_rec,
246                                       struct lov_mds_md *tail_lmm,
247                                       int lmm_size, struct dentry *dchild,
248                                       void **handle,struct lustre_handle *lockh)
249 {
250         struct mds_obd *mds = mds_req2mds(req);
251         struct obd_device *obd = req->rq_export->exp_obd;
252         struct inode *tail_inode, *head_inode;
253         struct dentry *de_tailparent = NULL, *de_tail = NULL, *de_head = NULL;
254         struct lustre_handle dlm_handles[4] = {{0}, {0}, {0}, {0}};
255         struct ll_fid head_fid;
256         int rc;
257         ENTRY;
258
259         if (lockh)
260                 ldlm_lock_decref(lockh, LCK_EX);
261
262         head_inode = dchild->d_inode;
263
264         head_fid.id = head_inode->i_ino;
265         head_fid.generation = head_inode->i_generation;
266         head_fid.f_type = head_inode->i_mode & S_IFMT;
267
268         rc = mds_get_parents_children_locked(obd, mds, &join_rec->jr_fid,
269                                              &de_tailparent, &head_fid,
270                                              &de_head, LCK_EX, rec->ur_name,
271                                              rec->ur_namelen, &de_tail,
272                                              NULL, 0, NULL, dlm_handles,
273                                              LCK_EX);
274         if (rc)
275                 GOTO(cleanup, rc);
276
277         *lockh = dlm_handles[1];
278         LASSERT(de_tailparent);
279         tail_inode = de_tail->d_inode;
280         if (tail_inode == NULL) {
281                 CERROR("tail inode doesn't exist(dir %lu,name %s)!\n",
282                        de_tailparent? de_tailparent->d_inode->i_ino : 0,
283                        rec->ur_name);
284                 GOTO(cleanup, rc = -ENOENT);
285         }
286
287         if (!S_ISREG(tail_inode->i_mode)) {
288                 CERROR("tail file is not a regular file (dir %lu, name %s)!\n",
289                        de_tailparent? de_tailparent->d_inode->i_ino : 0,
290                        rec->ur_name);
291                 GOTO(cleanup, rc = -EINVAL);
292         }
293
294         *handle = fsfilt_start(obd, head_inode, FSFILT_OP_JOIN, NULL);
295         if (IS_ERR(*handle)) {
296                 rc = PTR_ERR(*handle);
297                 GOTO(cleanup, rc);
298         }
299
300         rc = mds_get_md(obd, tail_inode, tail_lmm, &lmm_size, 1);
301         if (rc < 0) /* get md fails */
302                 GOTO(cleanup, rc);
303
304         LASSERT(le32_to_cpu(tail_lmm->lmm_magic) == LOV_MAGIC_JOIN ||
305                 le32_to_cpu(tail_lmm->lmm_magic) == LOV_MAGIC);
306
307         LASSERT(de_tailparent);
308         rc = vfs_unlink(de_tailparent->d_inode, de_tail);
309
310         if (rc == 0) {
311                 CDEBUG(D_INODE, "delete the tail inode %lu/%u \n",
312                        tail_inode->i_ino, tail_inode->i_generation);
313         }
314 cleanup:
315         if (dlm_handles[2].cookie != 0)
316                 ldlm_lock_decref(&dlm_handles[2], LCK_EX);
317
318         if (dlm_handles[0].cookie != 0) {
319                 if (rc)
320                         ldlm_lock_decref(&dlm_handles[0], LCK_EX);
321                 else
322                         ptlrpc_save_lock(req, &dlm_handles[0], LCK_EX);
323         }
324         if (de_tail)
325                 l_dput(de_tail);
326
327         if (de_tailparent)
328                 l_dput(de_tailparent);
329
330         if (de_head)
331                 l_dput(de_head);
332
333         RETURN(rc);
334 }
335
336 int mds_join_file(struct mds_update_record *rec, struct ptlrpc_request *req,
337                   struct dentry *de_head, struct lustre_handle *lockh)
338 {
339         struct mds_obd *mds = mds_req2mds(req);
340         struct obd_device *obd = req->rq_export->exp_obd;
341         struct inode *head_inode = NULL;
342         struct lvfs_run_ctxt saved;
343         void *handle = NULL;
344         struct lov_mds_md *head_lmm, *tail_lmm;
345         struct lov_mds_md_join *head_lmmj = NULL, *tail_lmmj = NULL;
346         int lmm_size, rc = 0, cleanup_phase = 0, size;
347         struct llog_handle *llh_head = NULL, *llh_tail = NULL;
348         struct llog_ctxt *ctxt = NULL;
349         struct mds_rec_join *join_rec;
350         ENTRY;
351
352         join_rec = lustre_swab_reqbuf(req, DLM_INTENT_REC_OFF + 3,
353                                       sizeof(*join_rec),
354                                       lustre_swab_mds_rec_join);
355         if (join_rec == NULL)
356                 RETURN (-EFAULT);
357
358         DEBUG_REQ(D_INODE, req,"head "LPU64"/%u, ptail ino "LPU64"/%u, tail %s",
359                   rec->ur_fid1->id, rec->ur_fid1->generation,
360                   join_rec->jr_fid.id, join_rec->jr_fid.generation,
361                   rec->ur_name);
362
363         size = mds->mds_max_mdsize;
364         lmm_size = mds->mds_max_mdsize;
365         OBD_ALLOC(head_lmm, lmm_size);
366         OBD_ALLOC(tail_lmm, lmm_size);
367         if (!head_lmm || !tail_lmm)
368                 GOTO(cleanup, rc = -ENOMEM);
369
370         /* acquire head's dentry */
371         LASSERT(de_head);
372         head_inode = de_head->d_inode;
373         if (head_inode == NULL) {
374                 CERROR("head inode doesn't exist!\n");
375                 GOTO(cleanup, rc = -ENOENT);
376         }
377
378         /*Unlink tail inode and get the lmm back*/
379         rc = mds_join_unlink_tail_inode(rec, req, join_rec, tail_lmm, lmm_size,
380                                         de_head, &handle, lockh);
381         if (rc) {
382                 CERROR("unlink tail_inode error %d\n", rc);
383                 GOTO(cleanup, rc);
384         }
385
386         LOCK_INODE_MUTEX(head_inode);
387         cleanup_phase = 1;
388         rc = mds_get_md(obd, head_inode, head_lmm, &size, 0);
389         if (rc < 0)
390                 GOTO(cleanup, rc);
391
392         LASSERT(le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC_JOIN ||
393                 le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC);
394
395         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
396         ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
397         LASSERT(ctxt != NULL);
398         cleanup_phase = 2;
399         if (le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC) { /*simple file */
400                 struct llog_logid *llog_array;
401
402                 rc = llog_create(ctxt, &llh_head, NULL, NULL);
403                 if (rc) {
404                         CERROR("cannot create new log, error = %d\n", rc);
405                         GOTO(cleanup, rc);
406                 }
407                 cleanup_phase = 3;
408                 llog_array = &llh_head->lgh_id;
409                 CDEBUG(D_INFO,"create arrary for %lu with id "LPU64":"LPU64"\n",
410                        head_inode->i_ino, llog_array->lgl_oid,
411                        llog_array->lgl_ogr);
412                 rc = llog_init_handle(llh_head, LLOG_F_IS_PLAIN, NULL);
413                 if (rc)
414                         GOTO(cleanup, rc);
415                 OBD_ALLOC_PTR(head_lmmj);
416                 if (head_lmmj == NULL)
417                         GOTO(cleanup, rc = -ENOMEM);
418                 mds_init_stripe_join(head_lmmj, head_lmm, llog_array);
419                 mds_insert_join_lmm(llh_head, head_lmm, 0,join_rec->jr_headsize,
420                                     head_lmmj);
421         } else { /*head lmm is join file */
422                 head_lmmj = (struct lov_mds_md_join *)head_lmm;
423                 /* construct and fill extent llog object */
424                 rc = llog_create(ctxt, &llh_head,
425                                  &head_lmmj->lmmj_array_id, NULL);
426                 if (rc) {
427                         CERROR("cannot open existing log, error = %d\n", rc);
428                         GOTO(cleanup, rc);
429                 }
430                 cleanup_phase = 3;
431                 rc = llog_init_handle(llh_head, LLOG_F_IS_PLAIN, NULL);
432                 if (rc)
433                         GOTO(cleanup, rc);
434                 rc = mds_adjust_last_extent(llh_head, join_rec->jr_headsize);
435                 if (rc) {
436                         CERROR("can't adjust last extent of obj rc=%d\n", rc);
437                         GOTO(cleanup, rc);
438                 }
439         }
440
441         if (le32_to_cpu(tail_lmm->lmm_magic) != LOV_MAGIC_JOIN) {
442                 mds_insert_join_lmm(llh_head, tail_lmm, join_rec->jr_headsize,
443                                     -1, head_lmmj);
444         } else {
445                 struct mdsea_cb_data cbdata;
446                 tail_lmmj = (struct lov_mds_md_join *)tail_lmm;
447
448                 rc = llog_create(ctxt,&llh_tail,&tail_lmmj->lmmj_array_id,NULL);
449                 if (rc) {
450                         CERROR("cannot open existing log, error = %d\n", rc);
451                         GOTO(cleanup, rc);
452                 }
453                 rc = llog_init_handle(llh_tail, LLOG_F_IS_PLAIN, NULL);
454                 if (rc) {
455                         llog_close(llh_tail);
456                         GOTO(cleanup, rc);
457                 }
458                 cbdata.mc_llh = llh_head;
459                 cbdata.mc_headfile_sz = join_rec->jr_headsize;
460                 cbdata.mc_lmm_join = head_lmmj;
461                 rc = mdsea_iterate(llh_tail, (llog_cb_t)mdsea_append_extent,
462                                    &cbdata);
463                 if (rc) {
464                         llog_close(llh_tail);
465                         CERROR("can not append extent log error %d \n", rc);
466                         GOTO(cleanup, rc);
467                 }
468                 rc = llog_destroy(llh_tail);
469                 if (rc) {
470                         llog_close(llh_tail);
471                         CERROR("can not destroy log error %d \n", rc);
472                         GOTO(cleanup, rc);
473                 }
474                 llog_free_handle(llh_tail);
475         }
476         LASSERT(head_inode);
477         CDEBUG(D_INODE, "join finish, set lmm V2 to inode %lu \n",
478                head_inode->i_ino);
479         fsfilt_set_md(obd, head_inode, handle, head_lmmj,
480                       sizeof(struct lov_mds_md_join), "lov");
481         mds_finish_join(mds, req, head_inode, head_lmmj);
482 cleanup:
483         rc = mds_finish_transno(mds, head_inode, handle, req, rc, 0, 0);
484         switch(cleanup_phase){
485         case 3:
486                 llog_close(llh_head);
487         case 2:
488                 llog_ctxt_put(ctxt);
489                 if (head_lmmj && ((void*)head_lmmj != (void*)head_lmm))
490                         OBD_FREE_PTR(head_lmmj);
491
492                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
493         case 1:
494                 UNLOCK_INODE_MUTEX(head_inode);
495         case 0:
496                 if (tail_lmm != NULL)
497                         OBD_FREE(tail_lmm, lmm_size);
498                 if (head_lmm != NULL)
499                         OBD_FREE(head_lmm, lmm_size);
500                 break;
501         default:
502                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
503                 LBUG();
504         }
505         req->rq_status = rc;
506         RETURN(rc);
507 }
508