Whamcloud - gitweb
Changelog update
[fs/lustre-release.git] / lustre / mds / mds_join.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_join.c
37  *
38  * Lustre Metadata join handler file
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/fs.h>
49 #include <linux/jbd.h>
50 #include <linux/ext3_fs.h>
51 #include <obd_support.h>
52 #include <obd_class.h>
53 #include <obd.h>
54 #include <lustre_lib.h>
55 #include <lustre/lustre_idl.h>
56 #include <lustre_mds.h>
57 #include <lustre_dlm.h>
58 #include <lustre_log.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_lite.h>
61 #include <obd_lov.h>
62 #include "mds_internal.h"
63
64 struct mdsea_cb_data {
65     struct llog_handle     *mc_llh;
66     struct lov_mds_md      *mc_lmm;
67     struct lov_mds_md_join *mc_lmm_join;
68     __u64                   mc_offset;
69     __u64                   mc_headfile_sz;
70 };
71
72 static int mdsea_iterate(struct llog_handle *llh_tail, llog_cb_t cb,
73                          void *cbdata)
74 {
75     return llog_process(llh_tail, cb, cbdata, NULL);
76 }
77
78 static int mds_insert_join_lmm(struct llog_handle *llh,
79                                struct lov_mds_md *lmm,
80                                __u64 start, __u64 len,
81                                struct lov_mds_md_join *lmmj)
82 {
83         struct llog_rec_hdr rec;
84         struct mds_extent_desc *med;
85         int sz_med, rc;
86         ENTRY;
87
88
89         sz_med = lov_mds_md_size(le32_to_cpu(lmm->lmm_stripe_count),
90                                  LOV_MAGIC);
91         sz_med += 2 * sizeof(__u64);
92         sz_med = size_round(sz_med);
93
94         rec.lrh_len  = cpu_to_le32(sz_med);
95         rec.lrh_type = cpu_to_le32(LLOG_JOIN_REC);
96
97         CDEBUG(D_INFO, "insert extent "LPU64":"LPU64" lmm \n", start, len);
98
99         OBD_ALLOC(med, sz_med);
100         if (med == NULL)
101                 RETURN(-ENOMEM);
102
103         med->med_start = start;
104         med->med_len = len;
105         memcpy(&med->med_lmm, lmm,
106                 lov_mds_md_size(le32_to_cpu(lmm->lmm_stripe_count),
107                                 LOV_MAGIC));
108
109         rc = llog_write_rec(llh, &rec, NULL, 0, med, -1);
110         OBD_FREE(med, sz_med);
111
112         if (lmmj) {
113                 /*modify lmmj for join stripe info*/
114                 lmmj->lmmj_md.lmm_stripe_count += lmm->lmm_stripe_count;
115                 lmmj->lmmj_extent_count ++;
116         }
117
118         RETURN(rc);
119 }
120
121 static int mdsea_append_extent(struct llog_handle *llh_tail,
122                                struct llog_rec_hdr *rec_in_tail,
123                                struct mdsea_cb_data *cbdata)
124 {
125         struct mds_extent_desc *med =
126                         &((struct llog_array_rec *)rec_in_tail)->lmr_med;
127         int rc;
128         ENTRY;
129
130         CDEBUG(D_INODE, "insert lmm extent: "LPU64":"LPU64" \n",
131                med->med_start, med->med_len);
132         rc = mds_insert_join_lmm(cbdata->mc_llh, &med->med_lmm,
133                                  med->med_start + cbdata->mc_headfile_sz,
134                                  med->med_len, cbdata->mc_lmm_join);
135         if (rc) {
136                 CERROR("error %d insert the lmm \n", rc);
137                 RETURN(rc);
138         }
139         RETURN(LLOG_DEL_RECORD);
140 }
141
142 static void mds_init_stripe_join(struct lov_mds_md_join *lmmj,
143                                  struct lov_mds_md *lmm,
144                                  struct llog_logid  *logid)
145 {
146         lmmj->lmmj_md.lmm_magic = cpu_to_le32(LOV_MAGIC_JOIN);
147         lmmj->lmmj_md.lmm_object_id = lmm->lmm_object_id;
148         lmmj->lmmj_md.lmm_object_gr = lmm->lmm_object_gr;
149         lmmj->lmmj_md.lmm_pattern = lmm->lmm_pattern;
150         lmmj->lmmj_md.lmm_stripe_size = lmm->lmm_stripe_size;
151         lmmj->lmmj_md.lmm_stripe_count = 0;
152         lmmj->lmmj_extent_count = 0;
153         lmmj->lmmj_array_id = *logid;
154 }
155
156 static int mdsea_cancel_last_extent(struct llog_handle *llh_tail,
157                                     struct llog_rec_hdr *rec_in_tail,
158                                     struct mdsea_cb_data *cbdata)
159 {
160         struct mds_extent_desc *med =
161                         &((struct llog_array_rec *)rec_in_tail)->lmr_med;
162
163         CDEBUG(D_INODE, "extent: "LPU64":"LPU64" \n",  med->med_start,
164                med->med_len);
165
166         LASSERTF(cbdata->mc_offset == med->med_start,
167                  "A hole in the extent "LPU64"--"LPU64"\n",
168                  cbdata->mc_offset, med->med_start);
169
170         if (med->med_len != -1)
171                 cbdata->mc_offset = med->med_start + med->med_len;
172
173         if (med->med_start > cbdata->mc_headfile_sz || (med->med_len == -1)) {
174                 CDEBUG(D_INFO, "del rec offset"LPU64", head size "LPU64" \n",
175                        med->med_start, cbdata->mc_headfile_sz);
176                 if (!cbdata->mc_lmm) {
177                         int stripe = le32_to_cpu(med->med_lmm.lmm_stripe_count);
178                         OBD_ALLOC(cbdata->mc_lmm,
179                                   lov_mds_md_size(stripe, LOV_MAGIC));
180                         if (!cbdata->mc_lmm)
181                                 RETURN(-ENOMEM);
182                         memcpy(cbdata->mc_lmm, &med->med_lmm,
183                                lov_mds_md_size(stripe, LOV_MAGIC));
184                 }
185                 RETURN(LLOG_DEL_RECORD);
186         }
187         RETURN(0);
188 }
189
190 static int  mds_adjust_last_extent(struct llog_handle *llh_head,
191                                    __u64 head_size)
192 {
193         struct mdsea_cb_data  *cbdata;
194         int    rc;
195         ENTRY;
196
197         OBD_ALLOC_PTR(cbdata);
198
199         if (!cbdata)
200                 RETURN(-ENOMEM);
201
202         cbdata->mc_headfile_sz = head_size;
203         /*Find the last extent and cancel the record in the lmm*/
204         rc = mdsea_iterate(llh_head, (llog_cb_t)mdsea_cancel_last_extent,
205                            cbdata);
206
207         if (rc) {
208                 CERROR("can not find the last extent rc=%d\n", rc);
209                 GOTO(exit, rc);
210         }
211
212         LASSERT(cbdata->mc_lmm);
213
214         CDEBUG(D_INODE, "insert lmm extent: "LPU64":"LPU64" \n",
215                cbdata->mc_offset, (head_size - cbdata->mc_offset));
216
217         rc = mds_insert_join_lmm(llh_head, cbdata->mc_lmm,
218                                  cbdata->mc_offset,
219                                  (head_size - cbdata->mc_offset),
220                                  NULL);
221         if (rc)
222                 CERROR("error insert the lmm rc %d \n", rc);
223 exit:
224         if (cbdata && cbdata->mc_lmm)
225                 OBD_FREE(cbdata->mc_lmm,
226                          lov_mds_md_size(cbdata->mc_lmm->lmm_stripe_count,
227                                          LOV_MAGIC));
228         if (cbdata)
229                 OBD_FREE_PTR(cbdata);
230
231         RETURN(rc);
232 }
233
234 static void mds_finish_join(struct mds_obd *mds, struct ptlrpc_request *req,
235                            struct inode *inode, struct lov_mds_md_join *lmmj)
236 {
237         struct mds_body *body = lustre_msg_buf(req->rq_repmsg,DLM_REPLY_REC_OFF,
238                                                sizeof(*body));
239         int max_cookiesize = lmmj->lmmj_md.lmm_stripe_count *
240                                 sizeof(struct llog_cookie);
241         int max_easize = sizeof(*lmmj);
242
243         CDEBUG(D_INFO, "change the max md size from %d to "LPSZ"\n",
244                mds->mds_max_mdsize, sizeof(*lmmj));
245
246         if (mds->mds_max_mdsize < max_easize ||
247             mds->mds_max_cookiesize < max_cookiesize) {
248                 body->max_mdsize = mds->mds_max_mdsize > max_easize ?
249                                    mds->mds_max_mdsize : max_easize;
250                 mds->mds_max_mdsize = body->max_mdsize;
251                 body->max_cookiesize = mds->mds_max_cookiesize > max_cookiesize?
252                                    mds->mds_max_cookiesize : max_cookiesize;
253                 mds->mds_max_cookiesize = body->max_cookiesize;
254                 body->valid |= OBD_MD_FLMODEASIZE;
255         }
256
257         if (body->valid & OBD_MD_FLMODEASIZE)
258                 CDEBUG(D_INODE, "updating max_mdsize/max_cookiesize: %d/%d\n",
259                        mds->mds_max_mdsize, mds->mds_max_cookiesize);
260
261         mds_pack_inode2body(body, inode);
262 }
263
264 static int mds_join_unlink_tail_inode(struct mds_update_record *rec,
265                                       struct ptlrpc_request *req,
266                                       struct mds_rec_join *join_rec,
267                                       struct lov_mds_md *tail_lmm,
268                                       int lmm_size, struct dentry *dchild,
269                                       void **handle,struct lustre_handle *lockh)
270 {
271         struct mds_obd *mds = mds_req2mds(req);
272         struct obd_device *obd = req->rq_export->exp_obd;
273         struct inode *tail_inode, *head_inode;
274         struct dentry *de_tailparent = NULL, *de_tail = NULL, *de_head = NULL;
275         struct lustre_handle dlm_handles[4] = {{0}, {0}, {0}, {0}};
276         struct ll_fid head_fid;
277         int rc;
278         ENTRY;
279
280         if (lockh)
281                 ldlm_lock_decref(lockh, LCK_EX);
282
283         head_inode = dchild->d_inode;
284         ll_pack_fid(&head_fid, head_inode->i_ino, head_inode->i_generation,
285                       head_inode->i_mode & S_IFMT);
286
287         rc = mds_get_parents_children_locked(obd, mds, &join_rec->jr_fid,
288                                              &de_tailparent, &head_fid,
289                                              &de_head, LCK_EX, rec->ur_name,
290                                              rec->ur_namelen, &de_tail,
291                                              NULL, 0, NULL, dlm_handles,
292                                              LCK_EX);
293         if (rc)
294                 GOTO(cleanup, rc);
295
296         *lockh = dlm_handles[1];
297         LASSERT(de_tailparent);
298         tail_inode = de_tail->d_inode;
299         if (tail_inode == NULL) {
300                 CERROR("tail inode doesn't exist(dir %lu,name %s)!\n",
301                        de_tailparent? de_tailparent->d_inode->i_ino : 0,
302                        rec->ur_name);
303                 GOTO(cleanup, rc = -ENOENT);
304         }
305
306         if (!S_ISREG(tail_inode->i_mode)) {
307                 CERROR("tail file is not a regular file (dir %lu, name %s)!\n",
308                        de_tailparent? de_tailparent->d_inode->i_ino : 0,
309                        rec->ur_name);
310                 GOTO(cleanup, rc = -EINVAL);
311         }
312
313         *handle = fsfilt_start(obd, head_inode, FSFILT_OP_JOIN, NULL);
314         if (IS_ERR(*handle)) {
315                 rc = PTR_ERR(*handle);
316                 GOTO(cleanup, rc);
317         }
318
319         rc = mds_get_md(obd, tail_inode, tail_lmm, &lmm_size, 1, 0,
320                         req->rq_export->exp_connect_flags);
321         if (rc < 0) /* get md fails */
322                 GOTO(cleanup, rc);
323
324         LASSERT(le32_to_cpu(tail_lmm->lmm_magic) == LOV_MAGIC_JOIN ||
325                 le32_to_cpu(tail_lmm->lmm_magic) == LOV_MAGIC);
326
327         LASSERT(de_tailparent);
328         LOCK_INODE_MUTEX(de_tailparent->d_inode);
329         rc = ll_vfs_unlink(de_tailparent->d_inode, de_tail, mds->mds_vfsmnt);
330         UNLOCK_INODE_MUTEX(de_tailparent->d_inode);
331
332         if (rc == 0) {
333                 CDEBUG(D_INODE, "delete the tail inode %lu/%u \n",
334                        tail_inode->i_ino, tail_inode->i_generation);
335         }
336 cleanup:
337         if (dlm_handles[2].cookie != 0)
338                 ldlm_lock_decref(&dlm_handles[2], LCK_EX);
339
340         if (dlm_handles[0].cookie != 0) {
341                 if (rc)
342                         ldlm_lock_decref(&dlm_handles[0], LCK_EX);
343                 else
344                         ptlrpc_save_lock(req, &dlm_handles[0], LCK_EX);
345         }
346         if (de_tail)
347                 l_dput(de_tail);
348
349         if (de_tailparent)
350                 l_dput(de_tailparent);
351
352         if (de_head)
353                 l_dput(de_head);
354
355         RETURN(rc);
356 }
357
358 int mds_join_file(struct mds_update_record *rec, struct ptlrpc_request *req,
359                   struct dentry *de_head, struct lustre_handle *lockh)
360 {
361         struct mds_obd *mds = mds_req2mds(req);
362         struct obd_device *obd = req->rq_export->exp_obd;
363         struct inode *inodes[PTLRPC_NUM_VERSIONS] = { NULL };
364         struct inode *head_inode = NULL;
365         struct lvfs_run_ctxt saved;
366         void *handle = NULL;
367         struct lov_mds_md *head_lmm, *tail_lmm;
368         struct lov_mds_md_join *head_lmmj = NULL, *tail_lmmj = NULL;
369         int lmm_size, rc = 0, cleanup_phase = 0, size;
370         struct llog_handle *llh_head = NULL, *llh_tail = NULL;
371         struct llog_ctxt *ctxt = NULL;
372         struct mds_rec_join *join_rec;
373         ENTRY;
374
375         join_rec = lustre_swab_reqbuf(req, DLM_INTENT_REC_OFF + 3,
376                                       sizeof(*join_rec),
377                                       lustre_swab_mds_rec_join);
378         if (join_rec == NULL)
379                 RETURN (-EFAULT);
380
381         DEBUG_REQ(D_INODE, req,"head "LPU64"/%u, ptail ino "LPU64"/%u, tail %s",
382                   rec->ur_fid1->id, rec->ur_fid1->generation,
383                   join_rec->jr_fid.id, join_rec->jr_fid.generation,
384                   rec->ur_name);
385
386         size = mds->mds_max_mdsize;
387         lmm_size = mds->mds_max_mdsize;
388         OBD_ALLOC(head_lmm, lmm_size);
389         OBD_ALLOC(tail_lmm, lmm_size);
390         if (!head_lmm || !tail_lmm)
391                 GOTO(cleanup, rc = -ENOMEM);
392
393         /* acquire head's dentry */
394         LASSERT(de_head);
395         head_inode = de_head->d_inode;
396         if (head_inode == NULL) {
397                 CERROR("head inode doesn't exist!\n");
398                 GOTO(cleanup, rc = -ENOENT);
399         }
400
401         /*Unlink tail inode and get the lmm back*/
402         rc = mds_join_unlink_tail_inode(rec, req, join_rec, tail_lmm, lmm_size,
403                                         de_head, &handle, lockh);
404         if (rc) {
405                 CERROR("unlink tail_inode error %d\n", rc);
406                 GOTO(cleanup, rc);
407         }
408
409         LOCK_INODE_MUTEX(head_inode);
410         cleanup_phase = 1;
411         rc = mds_get_md(obd, head_inode, head_lmm, &size, 0, 0,
412                         req->rq_export->exp_connect_flags);
413         if (rc < 0)
414                 GOTO(cleanup, rc);
415
416         LASSERT(le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC_JOIN ||
417                 le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC);
418
419         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
420         ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
421         LASSERT(ctxt != NULL);
422         cleanup_phase = 2;
423         if (le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC) { /*simple file */
424                 struct llog_logid *llog_array;
425
426                 rc = llog_create(ctxt, &llh_head, NULL, NULL);
427                 if (rc) {
428                         CERROR("cannot create new log, error = %d\n", rc);
429                         GOTO(cleanup, rc);
430                 }
431                 cleanup_phase = 3;
432                 llog_array = &llh_head->lgh_id;
433                 CDEBUG(D_INFO,"create arrary for %lu with id "LPU64":"LPU64"\n",
434                        head_inode->i_ino, llog_array->lgl_oid,
435                        llog_array->lgl_ogr);
436                 rc = llog_init_handle(llh_head, LLOG_F_IS_PLAIN, NULL);
437                 if (rc)
438                         GOTO(cleanup, rc);
439                 OBD_ALLOC_PTR(head_lmmj);
440                 if (head_lmmj == NULL)
441                         GOTO(cleanup, rc = -ENOMEM);
442                 mds_init_stripe_join(head_lmmj, head_lmm, llog_array);
443                 mds_insert_join_lmm(llh_head, head_lmm, 0,join_rec->jr_headsize,
444                                     head_lmmj);
445         } else { /*head lmm is join file */
446                 head_lmmj = (struct lov_mds_md_join *)head_lmm;
447                 /* construct and fill extent llog object */
448                 rc = llog_create(ctxt, &llh_head,
449                                  &head_lmmj->lmmj_array_id, NULL);
450                 if (rc) {
451                         CERROR("cannot open existing log, error = %d\n", rc);
452                         GOTO(cleanup, rc);
453                 }
454                 cleanup_phase = 3;
455                 rc = llog_init_handle(llh_head, LLOG_F_IS_PLAIN, NULL);
456                 if (rc)
457                         GOTO(cleanup, rc);
458                 rc = mds_adjust_last_extent(llh_head, join_rec->jr_headsize);
459                 if (rc) {
460                         CERROR("can't adjust last extent of obj rc=%d\n", rc);
461                         GOTO(cleanup, rc);
462                 }
463         }
464
465         if (le32_to_cpu(tail_lmm->lmm_magic) != LOV_MAGIC_JOIN) {
466                 mds_insert_join_lmm(llh_head, tail_lmm, join_rec->jr_headsize,
467                                     -1, head_lmmj);
468         } else {
469                 struct mdsea_cb_data cbdata;
470                 tail_lmmj = (struct lov_mds_md_join *)tail_lmm;
471
472                 rc = llog_create(ctxt,&llh_tail,&tail_lmmj->lmmj_array_id,NULL);
473                 if (rc) {
474                         CERROR("cannot open existing log, error = %d\n", rc);
475                         GOTO(cleanup, rc);
476                 }
477                 rc = llog_init_handle(llh_tail, LLOG_F_IS_PLAIN, NULL);
478                 if (rc) {
479                         llog_close(llh_tail);
480                         GOTO(cleanup, rc);
481                 }
482                 cbdata.mc_llh = llh_head;
483                 cbdata.mc_headfile_sz = join_rec->jr_headsize;
484                 cbdata.mc_lmm_join = head_lmmj;
485                 rc = mdsea_iterate(llh_tail, (llog_cb_t)mdsea_append_extent,
486                                    &cbdata);
487                 if (rc) {
488                         llog_close(llh_tail);
489                         CERROR("can not append extent log error %d \n", rc);
490                         GOTO(cleanup, rc);
491                 }
492                 rc = llog_destroy(llh_tail);
493                 if (rc) {
494                         llog_close(llh_tail);
495                         CERROR("can not destroy log error %d \n", rc);
496                         GOTO(cleanup, rc);
497                 }
498                 llog_free_handle(llh_tail);
499         }
500         LASSERT(head_inode);
501         CDEBUG(D_INODE, "join finish, set lmm V2 to inode %lu \n",
502                head_inode->i_ino);
503         fsfilt_set_md(obd, head_inode, handle, head_lmmj,
504                       sizeof(struct lov_mds_md_join), "lov");
505         mds_finish_join(mds, req, head_inode, head_lmmj);
506 cleanup:
507         inodes[0] = head_inode;
508         rc = mds_finish_transno(mds, inodes, handle, req, rc, 0, 0);
509         switch(cleanup_phase){
510         case 3:
511                 llog_close(llh_head);
512         case 2:
513                 llog_ctxt_put(ctxt);
514                 if (head_lmmj && ((void*)head_lmmj != (void*)head_lmm))
515                         OBD_FREE_PTR(head_lmmj);
516
517                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
518         case 1:
519                 UNLOCK_INODE_MUTEX(head_inode);
520         case 0:
521                 if (tail_lmm != NULL)
522                         OBD_FREE(tail_lmm, lmm_size);
523                 if (head_lmm != NULL)
524                         OBD_FREE(head_lmm, lmm_size);
525                 break;
526         default:
527                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
528                 LBUG();
529         }
530         req->rq_status = rc;
531         RETURN(rc);
532 }