1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_join.c
38 * Lustre Metadata join handler file
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
49 #include <linux/jbd.h>
50 #include <linux/ext3_fs.h>
51 #include <obd_support.h>
52 #include <obd_class.h>
54 #include <lustre_lib.h>
55 #include <lustre/lustre_idl.h>
56 #include <lustre_mds.h>
57 #include <lustre_dlm.h>
58 #include <lustre_log.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_lite.h>
62 #include "mds_internal.h"
64 struct mdsea_cb_data {
65 struct llog_handle *mc_llh;
66 struct lov_mds_md *mc_lmm;
67 struct lov_mds_md_join *mc_lmm_join;
72 static int mdsea_iterate(struct llog_handle *llh_tail, llog_cb_t cb,
75 return llog_process(llh_tail, cb, cbdata, NULL);
78 static int mds_insert_join_lmm(struct llog_handle *llh,
79 struct lov_mds_md *lmm,
80 __u64 start, __u64 len,
81 struct lov_mds_md_join *lmmj)
83 struct llog_rec_hdr rec;
84 struct mds_extent_desc *med;
89 sz_med = lov_mds_md_size(le32_to_cpu(lmm->lmm_stripe_count),
91 sz_med += 2 * sizeof(__u64);
92 sz_med = size_round(sz_med);
94 rec.lrh_len = cpu_to_le32(sz_med);
95 rec.lrh_type = cpu_to_le32(LLOG_JOIN_REC);
97 CDEBUG(D_INFO, "insert extent "LPU64":"LPU64" lmm \n", start, len);
99 OBD_ALLOC(med, sz_med);
103 med->med_start = start;
105 memcpy(&med->med_lmm, lmm,
106 lov_mds_md_size(le32_to_cpu(lmm->lmm_stripe_count),
109 rc = llog_write_rec(llh, &rec, NULL, 0, med, -1);
110 OBD_FREE(med, sz_med);
113 /*modify lmmj for join stripe info*/
114 lmmj->lmmj_md.lmm_stripe_count += lmm->lmm_stripe_count;
115 lmmj->lmmj_extent_count ++;
121 static int mdsea_append_extent(struct llog_handle *llh_tail,
122 struct llog_rec_hdr *rec_in_tail,
123 struct mdsea_cb_data *cbdata)
125 struct mds_extent_desc *med =
126 &((struct llog_array_rec *)rec_in_tail)->lmr_med;
130 CDEBUG(D_INODE, "insert lmm extent: "LPU64":"LPU64" \n",
131 med->med_start, med->med_len);
132 rc = mds_insert_join_lmm(cbdata->mc_llh, &med->med_lmm,
133 med->med_start + cbdata->mc_headfile_sz,
134 med->med_len, cbdata->mc_lmm_join);
136 CERROR("error %d insert the lmm \n", rc);
139 RETURN(LLOG_DEL_RECORD);
142 static void mds_init_stripe_join(struct lov_mds_md_join *lmmj,
143 struct lov_mds_md *lmm,
144 struct llog_logid *logid)
146 lmmj->lmmj_md.lmm_magic = cpu_to_le32(LOV_MAGIC_JOIN);
147 lmmj->lmmj_md.lmm_object_id = lmm->lmm_object_id;
148 lmmj->lmmj_md.lmm_object_gr = lmm->lmm_object_gr;
149 lmmj->lmmj_md.lmm_pattern = lmm->lmm_pattern;
150 lmmj->lmmj_md.lmm_stripe_size = lmm->lmm_stripe_size;
151 lmmj->lmmj_md.lmm_stripe_count = 0;
152 lmmj->lmmj_extent_count = 0;
153 lmmj->lmmj_array_id = *logid;
156 static int mdsea_cancel_last_extent(struct llog_handle *llh_tail,
157 struct llog_rec_hdr *rec_in_tail,
158 struct mdsea_cb_data *cbdata)
160 struct mds_extent_desc *med =
161 &((struct llog_array_rec *)rec_in_tail)->lmr_med;
163 CDEBUG(D_INODE, "extent: "LPU64":"LPU64" \n", med->med_start,
166 LASSERTF(cbdata->mc_offset == med->med_start,
167 "A hole in the extent "LPU64"--"LPU64"\n",
168 cbdata->mc_offset, med->med_start);
170 if (med->med_len != -1)
171 cbdata->mc_offset = med->med_start + med->med_len;
173 if (med->med_start > cbdata->mc_headfile_sz || (med->med_len == -1)) {
174 CDEBUG(D_INFO, "del rec offset"LPU64", head size "LPU64" \n",
175 med->med_start, cbdata->mc_headfile_sz);
176 if (!cbdata->mc_lmm) {
177 int stripe = le32_to_cpu(med->med_lmm.lmm_stripe_count);
178 OBD_ALLOC(cbdata->mc_lmm,
179 lov_mds_md_size(stripe, LOV_MAGIC));
182 memcpy(cbdata->mc_lmm, &med->med_lmm,
183 lov_mds_md_size(stripe, LOV_MAGIC));
185 RETURN(LLOG_DEL_RECORD);
190 static int mds_adjust_last_extent(struct llog_handle *llh_head,
193 struct mdsea_cb_data *cbdata;
197 OBD_ALLOC_PTR(cbdata);
202 cbdata->mc_headfile_sz = head_size;
203 /*Find the last extent and cancel the record in the lmm*/
204 rc = mdsea_iterate(llh_head, (llog_cb_t)mdsea_cancel_last_extent,
208 CERROR("can not find the last extent rc=%d\n", rc);
212 LASSERT(cbdata->mc_lmm);
214 CDEBUG(D_INODE, "insert lmm extent: "LPU64":"LPU64" \n",
215 cbdata->mc_offset, (head_size - cbdata->mc_offset));
217 rc = mds_insert_join_lmm(llh_head, cbdata->mc_lmm,
219 (head_size - cbdata->mc_offset),
222 CERROR("error insert the lmm rc %d \n", rc);
224 if (cbdata && cbdata->mc_lmm)
225 OBD_FREE(cbdata->mc_lmm,
226 lov_mds_md_size(cbdata->mc_lmm->lmm_stripe_count,
229 OBD_FREE_PTR(cbdata);
234 static void mds_finish_join(struct mds_obd *mds, struct ptlrpc_request *req,
235 struct inode *inode, struct lov_mds_md_join *lmmj)
237 struct mds_body *body = lustre_msg_buf(req->rq_repmsg,DLM_REPLY_REC_OFF,
239 int max_cookiesize = lmmj->lmmj_md.lmm_stripe_count *
240 sizeof(struct llog_cookie);
241 int max_easize = sizeof(*lmmj);
243 CDEBUG(D_INFO, "change the max md size from %d to "LPSZ"\n",
244 mds->mds_max_mdsize, sizeof(*lmmj));
246 if (mds->mds_max_mdsize < max_easize ||
247 mds->mds_max_cookiesize < max_cookiesize) {
248 body->max_mdsize = mds->mds_max_mdsize > max_easize ?
249 mds->mds_max_mdsize : max_easize;
250 mds->mds_max_mdsize = body->max_mdsize;
251 body->max_cookiesize = mds->mds_max_cookiesize > max_cookiesize?
252 mds->mds_max_cookiesize : max_cookiesize;
253 mds->mds_max_cookiesize = body->max_cookiesize;
254 body->valid |= OBD_MD_FLMODEASIZE;
257 if (body->valid & OBD_MD_FLMODEASIZE)
258 CDEBUG(D_INODE, "updating max_mdsize/max_cookiesize: %d/%d\n",
259 mds->mds_max_mdsize, mds->mds_max_cookiesize);
261 mds_pack_inode2body(body, inode);
264 static int mds_join_unlink_tail_inode(struct mds_update_record *rec,
265 struct ptlrpc_request *req,
266 struct mds_rec_join *join_rec,
267 struct lov_mds_md *tail_lmm,
268 int lmm_size, struct dentry *dchild,
269 void **handle,struct lustre_handle *lockh)
271 struct mds_obd *mds = mds_req2mds(req);
272 struct obd_device *obd = req->rq_export->exp_obd;
273 struct inode *tail_inode, *head_inode;
274 struct dentry *de_tailparent = NULL, *de_tail = NULL, *de_head = NULL;
275 struct lustre_handle dlm_handles[4] = {{0}, {0}, {0}, {0}};
276 struct ll_fid head_fid;
281 ldlm_lock_decref(lockh, LCK_EX);
283 head_inode = dchild->d_inode;
284 ll_pack_fid(&head_fid, head_inode->i_ino, head_inode->i_generation,
285 head_inode->i_mode & S_IFMT);
287 rc = mds_get_parents_children_locked(obd, mds, &join_rec->jr_fid,
288 &de_tailparent, &head_fid,
289 &de_head, LCK_EX, rec->ur_name,
290 rec->ur_namelen, &de_tail,
291 NULL, 0, NULL, dlm_handles,
296 *lockh = dlm_handles[1];
297 LASSERT(de_tailparent);
298 tail_inode = de_tail->d_inode;
299 if (tail_inode == NULL) {
300 CERROR("tail inode doesn't exist(dir %lu,name %s)!\n",
301 de_tailparent? de_tailparent->d_inode->i_ino : 0,
303 GOTO(cleanup, rc = -ENOENT);
306 if (!S_ISREG(tail_inode->i_mode)) {
307 CERROR("tail file is not a regular file (dir %lu, name %s)!\n",
308 de_tailparent? de_tailparent->d_inode->i_ino : 0,
310 GOTO(cleanup, rc = -EINVAL);
313 *handle = fsfilt_start(obd, head_inode, FSFILT_OP_JOIN, NULL);
314 if (IS_ERR(*handle)) {
315 rc = PTR_ERR(*handle);
319 rc = mds_get_md(obd, tail_inode, tail_lmm, &lmm_size, 1, 0,
320 req->rq_export->exp_connect_flags);
321 if (rc < 0) /* get md fails */
324 LASSERT(le32_to_cpu(tail_lmm->lmm_magic) == LOV_MAGIC_JOIN ||
325 le32_to_cpu(tail_lmm->lmm_magic) == LOV_MAGIC);
327 LASSERT(de_tailparent);
328 LOCK_INODE_MUTEX(de_tailparent->d_inode);
329 rc = ll_vfs_unlink(de_tailparent->d_inode, de_tail, mds->mds_vfsmnt);
330 UNLOCK_INODE_MUTEX(de_tailparent->d_inode);
333 CDEBUG(D_INODE, "delete the tail inode %lu/%u \n",
334 tail_inode->i_ino, tail_inode->i_generation);
337 if (dlm_handles[2].cookie != 0)
338 ldlm_lock_decref(&dlm_handles[2], LCK_EX);
340 if (dlm_handles[0].cookie != 0) {
342 ldlm_lock_decref(&dlm_handles[0], LCK_EX);
344 ptlrpc_save_lock(req, &dlm_handles[0], LCK_EX);
350 l_dput(de_tailparent);
358 int mds_join_file(struct mds_update_record *rec, struct ptlrpc_request *req,
359 struct dentry *de_head, struct lustre_handle *lockh)
361 struct mds_obd *mds = mds_req2mds(req);
362 struct obd_device *obd = req->rq_export->exp_obd;
363 struct inode *inodes[PTLRPC_NUM_VERSIONS] = { NULL };
364 struct inode *head_inode = NULL;
365 struct lvfs_run_ctxt saved;
367 struct lov_mds_md *head_lmm, *tail_lmm;
368 struct lov_mds_md_join *head_lmmj = NULL, *tail_lmmj = NULL;
369 int lmm_size, rc = 0, cleanup_phase = 0, size;
370 struct llog_handle *llh_head = NULL, *llh_tail = NULL;
371 struct llog_ctxt *ctxt = NULL;
372 struct mds_rec_join *join_rec;
375 join_rec = lustre_swab_reqbuf(req, DLM_INTENT_REC_OFF + 3,
377 lustre_swab_mds_rec_join);
378 if (join_rec == NULL)
381 DEBUG_REQ(D_INODE, req,"head "LPU64"/%u, ptail ino "LPU64"/%u, tail %s",
382 rec->ur_fid1->id, rec->ur_fid1->generation,
383 join_rec->jr_fid.id, join_rec->jr_fid.generation,
386 size = mds->mds_max_mdsize;
387 lmm_size = mds->mds_max_mdsize;
388 OBD_ALLOC(head_lmm, lmm_size);
389 OBD_ALLOC(tail_lmm, lmm_size);
390 if (!head_lmm || !tail_lmm)
391 GOTO(cleanup, rc = -ENOMEM);
393 /* acquire head's dentry */
395 head_inode = de_head->d_inode;
396 if (head_inode == NULL) {
397 CERROR("head inode doesn't exist!\n");
398 GOTO(cleanup, rc = -ENOENT);
401 /*Unlink tail inode and get the lmm back*/
402 rc = mds_join_unlink_tail_inode(rec, req, join_rec, tail_lmm, lmm_size,
403 de_head, &handle, lockh);
405 CERROR("unlink tail_inode error %d\n", rc);
409 LOCK_INODE_MUTEX(head_inode);
411 rc = mds_get_md(obd, head_inode, head_lmm, &size, 0, 0,
412 req->rq_export->exp_connect_flags);
416 LASSERT(le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC_JOIN ||
417 le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC);
419 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
420 ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
421 LASSERT(ctxt != NULL);
423 if (le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC) { /*simple file */
424 struct llog_logid *llog_array;
426 rc = llog_create(ctxt, &llh_head, NULL, NULL);
428 CERROR("cannot create new log, error = %d\n", rc);
432 llog_array = &llh_head->lgh_id;
433 CDEBUG(D_INFO,"create arrary for %lu with id "LPU64":"LPU64"\n",
434 head_inode->i_ino, llog_array->lgl_oid,
435 llog_array->lgl_ogr);
436 rc = llog_init_handle(llh_head, LLOG_F_IS_PLAIN, NULL);
439 OBD_ALLOC_PTR(head_lmmj);
440 if (head_lmmj == NULL)
441 GOTO(cleanup, rc = -ENOMEM);
442 mds_init_stripe_join(head_lmmj, head_lmm, llog_array);
443 mds_insert_join_lmm(llh_head, head_lmm, 0,join_rec->jr_headsize,
445 } else { /*head lmm is join file */
446 head_lmmj = (struct lov_mds_md_join *)head_lmm;
447 /* construct and fill extent llog object */
448 rc = llog_create(ctxt, &llh_head,
449 &head_lmmj->lmmj_array_id, NULL);
451 CERROR("cannot open existing log, error = %d\n", rc);
455 rc = llog_init_handle(llh_head, LLOG_F_IS_PLAIN, NULL);
458 rc = mds_adjust_last_extent(llh_head, join_rec->jr_headsize);
460 CERROR("can't adjust last extent of obj rc=%d\n", rc);
465 if (le32_to_cpu(tail_lmm->lmm_magic) != LOV_MAGIC_JOIN) {
466 mds_insert_join_lmm(llh_head, tail_lmm, join_rec->jr_headsize,
469 struct mdsea_cb_data cbdata;
470 tail_lmmj = (struct lov_mds_md_join *)tail_lmm;
472 rc = llog_create(ctxt,&llh_tail,&tail_lmmj->lmmj_array_id,NULL);
474 CERROR("cannot open existing log, error = %d\n", rc);
477 rc = llog_init_handle(llh_tail, LLOG_F_IS_PLAIN, NULL);
479 llog_close(llh_tail);
482 cbdata.mc_llh = llh_head;
483 cbdata.mc_headfile_sz = join_rec->jr_headsize;
484 cbdata.mc_lmm_join = head_lmmj;
485 rc = mdsea_iterate(llh_tail, (llog_cb_t)mdsea_append_extent,
488 llog_close(llh_tail);
489 CERROR("can not append extent log error %d \n", rc);
492 rc = llog_destroy(llh_tail);
494 llog_close(llh_tail);
495 CERROR("can not destroy log error %d \n", rc);
498 llog_free_handle(llh_tail);
501 CDEBUG(D_INODE, "join finish, set lmm V2 to inode %lu \n",
503 fsfilt_set_md(obd, head_inode, handle, head_lmmj,
504 sizeof(struct lov_mds_md_join), "lov");
505 mds_finish_join(mds, req, head_inode, head_lmmj);
507 inodes[0] = head_inode;
508 rc = mds_finish_transno(mds, inodes, handle, req, rc, 0, 0);
509 switch(cleanup_phase){
511 llog_close(llh_head);
514 if (head_lmmj && ((void*)head_lmmj != (void*)head_lmm))
515 OBD_FREE_PTR(head_lmmj);
517 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
519 UNLOCK_INODE_MUTEX(head_inode);
521 if (tail_lmm != NULL)
522 OBD_FREE(tail_lmm, lmm_size);
523 if (head_lmm != NULL)
524 OBD_FREE(head_lmm, lmm_size);
527 CERROR("invalid cleanup_phase %d\n", cleanup_phase);