码迷,mamicode.com
首页 > 数据库 > 详细

PostgreSQL中流复制pg_basebackup做了什么

时间:2015-08-10 21:54:06      阅读:182      评论:0      收藏:0      [点我收藏+]

标签:

解压PostgreSQL源代码包后可以到如下路径:postgresql-9.2.4\src\backend\replication下可以看到,basebackup.c,另外还可以看到walreceiver.c,walsender.c.

basebackup做了什么一目了然:

/*-------------------------------------------------------------------------
*
* basebackup.c
* code for taking a base backup and streaming it to a standby
*
* Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/basebackup.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <time.h>

#include "access/xlog_internal.h" /* for pg_start/stop_backup */
#include "catalog/catalog.h"
#include "catalog/pg_type.h"
#include "lib/stringinfo.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "nodes/pg_list.h"
#include "replication/basebackup.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"

typedef struct
{
const char *label;
bool progress;
bool fastcheckpoint;
bool nowait;
bool includewal;
} basebackup_options;


static int64 sendDir(char *path, int basepathlen, bool sizeonly);
static int64 sendTablespace(char *path, bool sizeonly);
static bool sendFile(char *readfilename, char *tarfilename,
struct stat * statbuf, bool missing_ok);
static void sendFileWithContent(const char *filename, const char *content);
static void _tarWriteHeader(const char *filename, const char *linktarget,
struct stat * statbuf);
static void send_int8_string(StringInfoData *buf, int64 intval);
static void SendBackupHeader(List *tablespaces);
static void base_backup_cleanup(int code, Datum arg);
static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr);
static int compareWalFileNames(const void *a, const void *b);

/*
* Size of each block sent into the tar stream for larger files.
*/
#define TAR_SEND_SIZE 32768

typedef struct
{
char *oid;
char *path;
int64 size;
} tablespaceinfo;


/*
* Called when ERROR or FATAL happens in perform_base_backup() after
* we have started the backup - make sure we end it!
*/
static void
base_backup_cleanup(int code, Datum arg)
{
do_pg_abort_backup();
}

/*
* Actually do a base backup for the specified tablespaces.
*
* This is split out mainly to avoid complaints about "variable might be
* clobbered by longjmp" from stupider versions of gcc.
*/
static void
perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
char *labelfile;

startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
SendXlogRecPtrResult(startptr);

PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
{
List *tablespaces = NIL;
ListCell *lc;
struct dirent *de;
tablespaceinfo *ti;

/* Collect information about all tablespaces */
while ((de = ReadDir(tblspcdir, "pg_tblspc")) != NULL)
{
char fullpath[MAXPGPATH];
char linkpath[MAXPGPATH];
int rllen;

/* Skip special stuff */
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;

snprintf(fullpath, sizeof(fullpath), "pg_tblspc/%s", de->d_name);

#if defined(HAVE_READLINK) || defined(WIN32)
rllen = readlink(fullpath, linkpath, sizeof(linkpath));
if (rllen < 0)
{
ereport(WARNING,
(errmsg("could not read symbolic link \"%s\": %m",
fullpath)));
continue;
}
else if (rllen >= sizeof(linkpath))
{
ereport(WARNING,
(errmsg("symbolic link \"%s\" target is too long",
fullpath)));
continue;
}
linkpath[rllen] = ‘\0‘;

ti = palloc(sizeof(tablespaceinfo));
ti->oid = pstrdup(de->d_name);
ti->path = pstrdup(linkpath);
ti->size = opt->progress ? sendTablespace(fullpath, true) : -1;
tablespaces = lappend(tablespaces, ti);
#else

/*
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("tablespaces are not supported on this platform")));
#endif
}

/* Add a node for the base directory at the end */
ti = palloc0(sizeof(tablespaceinfo));
ti->size = opt->progress ? sendDir(".", 1, true) : -1;
tablespaces = lappend(tablespaces, ti);

/* Send tablespace header */
SendBackupHeader(tablespaces);

/* Send off our tablespaces one by one */
foreach(lc, tablespaces)
{
tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
StringInfoData buf;

/* Send CopyOutResponse message */
pq_beginmessage(&buf, ‘H‘);
pq_sendbyte(&buf, 0); /* overall format */
pq_sendint(&buf, 0, 2); /* natts */
pq_endmessage(&buf);

if (ti->path == NULL)
{
struct stat statbuf;

/* In the main tar, include the backup_label first... */
sendFileWithContent(BACKUP_LABEL_FILE, labelfile);

/* ... then the bulk of the files ... */
sendDir(".", 1, false);

/* ... and pg_control after everything else. */
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat control file \"%s\": %m",
XLOG_CONTROL_FILE)));
sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false);
}
else
sendTablespace(ti->path, false);

/*
* If we‘re including WAL, and this is the main data directory we
* don‘t terminate the tar stream here. Instead, we will append
* the xlog files below and terminate it then. This is safe since
* the main data directory is always sent *last*.
*/
if (opt->includewal && ti->path == NULL)
{
Assert(lnext(lc) == NULL);
}
else
pq_putemptymessage(‘c‘); /* CopyDone */
}
}
PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);

endptr = do_pg_stop_backup(labelfile, !opt->nowait);

if (opt->includewal)
{
/*
* We‘ve left the last tar file "open", so we can now append the
* required WAL files to it.
*/
char pathbuf[MAXPGPATH];
uint32 logid,
logseg;
uint32 startlogid,
startlogseg;
uint32 endlogid,
endlogseg;
struct stat statbuf;
List *historyFileList = NIL;
List *walFileList = NIL;
char **walFiles;
int nWalFiles;
char firstoff[MAXFNAMELEN];
char lastoff[MAXFNAMELEN];
DIR *dir;
struct dirent *de;
int i;
ListCell *lc;
TimeLineID tli;

/*
* I‘d rather not worry about timelines here, so scan pg_xlog and
* include all WAL files in the range between ‘startptr‘ and ‘endptr‘,
* regardless of the timeline the file is stamped with. If there are
* some spurious WAL files belonging to timelines that don‘t belong
* in this server‘s history, they will be included too. Normally there
* shouldn‘t be such files, but if there are, there‘s little harm in
* including them.
*/
XLByteToSeg(startptr, startlogid, startlogseg);
XLogFileName(firstoff, ThisTimeLineID, startlogid, startlogseg);
XLByteToPrevSeg(endptr, endlogid, endlogseg);
XLogFileName(lastoff, ThisTimeLineID, endlogid, endlogseg);

dir = AllocateDir("pg_xlog");
if (!dir)
ereport(ERROR,
(errmsg("could not open directory \"%s\": %m", "pg_xlog")));
while ((de = ReadDir(dir, "pg_xlog")) != NULL)
{
/* Does it look like a WAL segment, and is it in the range? */
if (strlen(de->d_name) == 24 &&
strspn(de->d_name, "0123456789ABCDEF") == 24 &&
strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
strcmp(de->d_name + 8, lastoff + 8) <= 0)
{
walFileList = lappend(walFileList, pstrdup(de->d_name));
}
/* Does it look like a timeline history file? */
else if (strlen(de->d_name) == 8 + strlen(".history") &&
strspn(de->d_name, "0123456789ABCDEF") == 8 &&
strcmp(de->d_name + 8, ".history") == 0)
{
historyFileList = lappend(historyFileList, pstrdup(de->d_name));
}
}
FreeDir(dir);

/*
* Before we go any further, check that none of the WAL segments we
* need were removed.
*/
CheckXLogRemoved(startlogid, startlogseg, ThisTimeLineID);

/*
* Put the WAL filenames into an array, and sort. We send the files
* in order from oldest to newest, to reduce the chance that a file
* is recycled before we get a chance to send it over.
*/
nWalFiles = list_length(walFileList);
walFiles = palloc(nWalFiles * sizeof(char *));
i = 0;
foreach(lc, walFileList)
{
walFiles[i++] = lfirst(lc);
}
qsort(walFiles, nWalFiles, sizeof(char *), compareWalFileNames);

/*
* Sanity check: the first and last segment should cover startptr and
* endptr, with no gaps in between.
*/
XLogFromFileName(walFiles[0], &tli, &logid, &logseg);
if (logid != startlogid || logseg != startlogseg)
{
char startfname[MAXFNAMELEN];
XLogFileName(startfname, ThisTimeLineID, startlogid, startlogseg);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", startfname)));
}
for (i = 0; i < nWalFiles; i++)
{
int currlogid = logid,
currlogseg = logseg;
int nextlogid = logid,
nextlogseg = logseg;
NextLogSeg(nextlogid, nextlogseg);

XLogFromFileName(walFiles[i], &tli, &logid, &logseg);
if (!((nextlogid == logid && nextlogseg == logseg) ||
(currlogid == logid && currlogseg == logseg)))
{
char nextfname[MAXFNAMELEN];
XLogFileName(nextfname, ThisTimeLineID, nextlogid, nextlogseg);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", nextfname)));
}
}
if (logid != endlogid || logseg != endlogseg)
{
char endfname[MAXFNAMELEN];
XLogFileName(endfname, ThisTimeLineID, endlogid, endlogseg);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", endfname)));
}

/* Ok, we have everything we need. Send the WAL files. */
for (i = 0; i < nWalFiles; i++)
{
FILE *fp;
char buf[TAR_SEND_SIZE];
size_t cnt;
pgoff_t len = 0;

snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFiles[i]);
XLogFromFileName(walFiles[i], &tli, &logid, &logseg);

fp = AllocateFile(pathbuf, "rb");
if (fp == NULL)
{
/*
* Most likely reason for this is that the file was already
* removed by a checkpoint, so check for that to get a better
* error message.
*/
CheckXLogRemoved(logid, logseg, tli);

ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", pathbuf)));
}

if (fstat(fileno(fp), &statbuf) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
pathbuf)));
if (statbuf.st_size != XLogSegSize)
{
CheckXLogRemoved(logid, logseg, tli);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}

_tarWriteHeader(pathbuf, NULL, &statbuf);

while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
{
CheckXLogRemoved(logid, logseg, tli);
/* Send the chunk as a CopyData message */
if (pq_putmessage(‘d‘, buf, cnt))
ereport(ERROR,
(errmsg("base backup could not send data, aborting backup")));

len += cnt;
if (len == XLogSegSize)
break;
}

if (len != XLogSegSize)
{
CheckXLogRemoved(logid, logseg, tli);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}

/* XLogSegSize is a multiple of 512, so no need for padding */
FreeFile(fp);
}

/*
* Send timeline history files too. Only the latest timeline history
* file is required for recovery, and even that only if there happens
* to be a timeline switch in the first WAL segment that contains the
* checkpoint record, or if we‘re taking a base backup from a standby
* server and the target timeline changes while the backup is taken.
* But they are small and highly useful for debugging purposes, so
* better include them all, always.
*/
foreach(lc, historyFileList)
{
char *fname = lfirst(lc);
snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);

if (lstat(pathbuf, &statbuf) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", pathbuf)));

sendFile(pathbuf, pathbuf, &statbuf, false);
}

/* Send CopyDone message for the last tar file */
pq_putemptymessage(‘c‘);
}
SendXlogRecPtrResult(endptr);
}

/*
* qsort comparison function, to compare log/seg portion of WAL segment
* filenames, ignoring the timeline portion.
*/
static int
compareWalFileNames(const void *a, const void *b)
{
char *fna = *((char **) a);
char *fnb = *((char **) b);

return strcmp(fna + 8, fnb + 8);
}

/*
* Parse the base backup options passed down by the parser
*/
static void
parse_basebackup_options(List *options, basebackup_options *opt)
{
ListCell *lopt;
bool o_label = false;
bool o_progress = false;
bool o_fast = false;
bool o_nowait = false;
bool o_wal = false;

MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
{
DefElem *defel = (DefElem *) lfirst(lopt);

if (strcmp(defel->defname, "label") == 0)
{
if (o_label)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("duplicate option \"%s\"", defel->defname)));
opt->label = strVal(defel->arg);
o_label = true;
}
else if (strcmp(defel->defname, "progress") == 0)
{
if (o_progress)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("duplicate option \"%s\"", defel->defname)));
opt->progress = true;
o_progress = true;
}
else if (strcmp(defel->defname, "fast") == 0)
{
if (o_fast)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("duplicate option \"%s\"", defel->defname)));
opt->fastcheckpoint = true;
o_fast = true;
}
else if (strcmp(defel->defname, "nowait") == 0)
{
if (o_nowait)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("duplicate option \"%s\"", defel->defname)));
opt->nowait = true;
o_nowait = true;
}
else if (strcmp(defel->defname, "wal") == 0)
{
if (o_wal)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("duplicate option \"%s\"", defel->defname)));
opt->includewal = true;
o_wal = true;
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
if (opt->label == NULL)
opt->label = "base backup";
}


/*
* SendBaseBackup() - send a complete base backup.
*
* The function will put the system into backup mode like pg_start_backup()
* does, so that the backup is consistent even though we read directly from
* the filesystem, bypassing the buffer cache.
*/
void
SendBaseBackup(BaseBackupCmd *cmd)
{
DIR *dir;
MemoryContext backup_context;
MemoryContext old_context;
basebackup_options opt;

parse_basebackup_options(cmd->options, &opt);

backup_context = AllocSetContextCreate(CurrentMemoryContext,
"Streaming base backup context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
old_context = MemoryContextSwitchTo(backup_context);

WalSndSetState(WALSNDSTATE_BACKUP);

if (update_process_title)
{
char activitymsg[50];

snprintf(activitymsg, sizeof(activitymsg), "sending backup \"%s\"",
opt.label);
set_ps_display(activitymsg, false);
}

/* Make sure we can open the directory with tablespaces in it */
dir = AllocateDir("pg_tblspc");
if (!dir)
ereport(ERROR,
(errmsg("could not open directory \"%s\": %m", "pg_tblspc")));

perform_base_backup(&opt, dir);

FreeDir(dir);

MemoryContextSwitchTo(old_context);
MemoryContextDelete(backup_context);
}

static void
send_int8_string(StringInfoData *buf, int64 intval)
{
char is[32];

sprintf(is, INT64_FORMAT, intval);
pq_sendint(buf, strlen(is), 4);
pq_sendbytes(buf, is, strlen(is));
}

static void
SendBackupHeader(List *tablespaces)
{
StringInfoData buf;
ListCell *lc;

/* Construct and send the directory information */
pq_beginmessage(&buf, ‘T‘); /* RowDescription */
pq_sendint(&buf, 3, 2); /* 3 fields */

/* First field - spcoid */
pq_sendstring(&buf, "spcoid");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, OIDOID, 4); /* type oid */
pq_sendint(&buf, 4, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */

/* Second field - spcpath */
pq_sendstring(&buf, "spclocation");
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_sendint(&buf, TEXTOID, 4);
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);

/* Third field - size */
pq_sendstring(&buf, "size");
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_sendint(&buf, INT8OID, 4);
pq_sendint(&buf, 8, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);

foreach(lc, tablespaces)
{
tablespaceinfo *ti = lfirst(lc);

/* Send one datarow message */
pq_beginmessage(&buf, ‘D‘);
pq_sendint(&buf, 3, 2); /* number of columns */
if (ti->path == NULL)
{
pq_sendint(&buf, -1, 4); /* Length = -1 ==> NULL */
pq_sendint(&buf, -1, 4);
}
else
{
pq_sendint(&buf, strlen(ti->oid), 4); /* length */
pq_sendbytes(&buf, ti->oid, strlen(ti->oid));
pq_sendint(&buf, strlen(ti->path), 4); /* length */
pq_sendbytes(&buf, ti->path, strlen(ti->path));
}
if (ti->size >= 0)
send_int8_string(&buf, ti->size / 1024);
else
pq_sendint(&buf, -1, 4); /* NULL */

pq_endmessage(&buf);
}

/* Send a CommandComplete message */
pq_puttextmessage(‘C‘, "SELECT");
}

/*
* Send a single resultset containing just a single
* XlogRecPtr record (in text format)
*/
static void
SendXlogRecPtrResult(XLogRecPtr ptr)
{
StringInfoData buf;
char str[MAXFNAMELEN];

snprintf(str, sizeof(str), "%X/%X", ptr.xlogid, ptr.xrecoff);

pq_beginmessage(&buf, ‘T‘); /* RowDescription */
pq_sendint(&buf, 1, 2); /* 1 field */

/* Field header */
pq_sendstring(&buf, "recptr");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);

/* Data row */
pq_beginmessage(&buf, ‘D‘);
pq_sendint(&buf, 1, 2); /* number of columns */
pq_sendint(&buf, strlen(str), 4); /* length */
pq_sendbytes(&buf, str, strlen(str));
pq_endmessage(&buf);

/* Send a CommandComplete message */
pq_puttextmessage(‘C‘, "SELECT");
}

/*
* Inject a file with given name and content in the output tar stream.
*/
static void
sendFileWithContent(const char *filename, const char *content)
{
struct stat statbuf;
int pad,
len;

len = strlen(content);

/*
* Construct a stat struct for the backup_label file we‘re injecting in
* the tar.
*/
/* Windows doesn‘t have the concept of uid and gid */
#ifdef WIN32
statbuf.st_uid = 0;
statbuf.st_gid = 0;
#else
statbuf.st_uid = geteuid();
statbuf.st_gid = getegid();
#endif
statbuf.st_mtime = time(NULL);
statbuf.st_mode = S_IRUSR | S_IWUSR;
statbuf.st_size = len;

_tarWriteHeader(filename, NULL, &statbuf);
/* Send the contents as a CopyData message */
pq_putmessage(‘d‘, content, len);

/* Pad to 512 byte boundary, per tar format requirements */
pad = ((len + 511) & ~511) - len;
if (pad > 0)
{
char buf[512];

MemSet(buf, 0, pad);
pq_putmessage(‘d‘, buf, pad);
}
}

/*
* Include the tablespace directory pointed to by ‘path‘ in the output tar
* stream. If ‘sizeonly‘ is true, we just calculate a total length and return
* it, without actually sending anything.
*/
static int64
sendTablespace(char *path, bool sizeonly)
{
int64 size;
char pathbuf[MAXPGPATH];
struct stat statbuf;

/*
* ‘path‘ points to the tablespace location, but we only want to include
* the version directory in it that belongs to us.
*/
snprintf(pathbuf, sizeof(pathbuf), "%s/%s", path,
TABLESPACE_VERSION_DIRECTORY);

/*
* Store a directory entry in the tar file so we get the permissions right.
*/
if (lstat(pathbuf, &statbuf) != 0)
{
if (errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file or directory \"%s\": %m",
pathbuf)));

/* If the tablespace went away while scanning, it‘s no error. */
return 0;
}
if (!sizeonly)
_tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf);
size = 512; /* Size of the header just added */

/* Send all the files in the tablespace version directory */
size += sendDir(pathbuf, strlen(path), sizeonly);

return size;
}

/*
* Include all files from the given directory in the output tar stream. If
* ‘sizeonly‘ is true, we just calculate a total length and return it, without
* actually sending anything.
*/
static int64
sendDir(char *path, int basepathlen, bool sizeonly)
{
DIR *dir;
struct dirent *de;
char pathbuf[MAXPGPATH];
struct stat statbuf;
int64 size = 0;

dir = AllocateDir(path);
while ((de = ReadDir(dir, path)) != NULL)
{
/* Skip special stuff */
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;

/* Skip temporary files */
if (strncmp(de->d_name,
PG_TEMP_FILE_PREFIX,
strlen(PG_TEMP_FILE_PREFIX)) == 0)
continue;

/*
* If there‘s a backup_label file, it belongs to a backup started by
* the user with pg_start_backup(). It is *not* correct for this
* backup, our backup_label is injected into the tar separately.
*/
if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0)
continue;

/*
* Check if the postmaster has signaled us to exit, and abort with an
* error in that case. The error handler further up will call
* do_pg_abort_backup() for us.
*/
if (walsender_shutdown_requested || walsender_ready_to_stop)
ereport(ERROR,
(errmsg("shutdown requested, aborting active base backup")));

snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);

/* Skip postmaster.pid and postmaster.opts in the data directory */
if (strcmp(pathbuf, "./postmaster.pid") == 0 ||
strcmp(pathbuf, "./postmaster.opts") == 0)
continue;

/* Skip pg_control here to back up it last */
if (strcmp(pathbuf, "./global/pg_control") == 0)
continue;

if (lstat(pathbuf, &statbuf) != 0)
{
if (errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file or directory \"%s\": %m",
pathbuf)));

/* If the file went away while scanning, it‘s no error. */
continue;
}

/*
* We can skip pg_xlog, the WAL segments need to be fetched from the
* WAL archive anyway. But include it as an empty directory anyway, so
* we get permissions right.
*/
if (strcmp(pathbuf, "./pg_xlog") == 0)
{
if (!sizeonly)
{
/* If pg_xlog is a symlink, write it as a directory anyway */
#ifndef WIN32
if (S_ISLNK(statbuf.st_mode))
#else
if (pgwin32_is_junction(pathbuf))
#endif
statbuf.st_mode = S_IFDIR | S_IRWXU;
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
}
size += 512; /* Size of the header just added */
continue; /* don‘t recurse into pg_xlog */
}

/* Allow symbolic links in pg_tblspc only */
if (strcmp(path, "./pg_tblspc") == 0 &&
#ifndef WIN32
S_ISLNK(statbuf.st_mode)
#else
pgwin32_is_junction(pathbuf)
#endif
)
{
#if defined(HAVE_READLINK) || defined(WIN32)
char linkpath[MAXPGPATH];
int rllen;

rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
if (rllen < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read symbolic link \"%s\": %m",
pathbuf)));
if (rllen >= sizeof(linkpath))
ereport(ERROR,
(errmsg("symbolic link \"%s\" target is too long",
pathbuf)));
linkpath[rllen] = ‘\0‘;

if (!sizeonly)
_tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf);
size += 512; /* Size of the header just added */
#else

/*
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("tablespaces are not supported on this platform")));
continue;
#endif /* HAVE_READLINK */
}
else if (S_ISDIR(statbuf.st_mode))
{
/*
* Store a directory entry in the tar file so we can get the
* permissions right.
*/
if (!sizeonly)
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
size += 512; /* Size of the header just added */

/* call ourselves recursively for a directory */
size += sendDir(pathbuf, basepathlen, sizeonly);
}
else if (S_ISREG(statbuf.st_mode))
{
bool sent = false;

if (!sizeonly)
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
true);

if (sent || sizeonly)
{
/* Add size, rounded up to 512byte block */
size += ((statbuf.st_size + 511) & ~511);
size += 512; /* Size of the header of the file */
}
}
else
ereport(WARNING,
(errmsg("skipping special file \"%s\"", pathbuf)));
}
FreeDir(dir);
return size;
}

/*****
* Functions for handling tar file format
*
* Copied from pg_dump, but modified to work with libpq for sending
*/


/*
* Utility routine to print possibly larger than 32 bit integers in a
* portable fashion. Filled with zeros.
*/
static void
print_val(char *s, uint64 val, unsigned int base, size_t len)
{
int i;

for (i = len; i > 0; i--)
{
int digit = val % base;

s[i - 1] = ‘0‘ + digit;
val = val / base;
}
}

/*
* Maximum file size for a tar member: The limit inherent in the
* format is 2^33-1 bytes (nearly 8 GB). But we don‘t want to exceed
* what we can represent in pgoff_t.
*/
#define MAX_TAR_MEMBER_FILELEN (((int64) 1 << Min(33, sizeof(pgoff_t)*8 - 1)) - 1)

static int
_tarChecksum(char *header)
{
int i,
sum;

/*
* Per POSIX, the checksum is the simple sum of all bytes in the header,
* treating the bytes as unsigned, and treating the checksum field (at
* offset 148) as though it contained 8 spaces.
*/
sum = 8 * ‘ ‘; /* presumed value for checksum field */
for (i = 0; i < 512; i++)
if (i < 148 || i >= 156)
sum += 0xFF & header[i];
return sum;
}

/*
* Given the member, write the TAR header & send the file.
*
* If ‘missing_ok‘ is true, will not throw an error if the file is not found.
*
* Returns true if the file was successfully sent, false if ‘missing_ok‘,
* and the file did not exist.
*/
static bool
sendFile(char *readfilename, char *tarfilename, struct stat *statbuf,
bool missing_ok)
{
FILE *fp;
char buf[TAR_SEND_SIZE];
size_t cnt;
pgoff_t len = 0;
size_t pad;

fp = AllocateFile(readfilename, "rb");
if (fp == NULL)
{
if (errno == ENOENT && missing_ok)
return false;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", readfilename)));
}

/*
* Some compilers will throw a warning knowing this test can never be true
* because pgoff_t can‘t exceed the compared maximum on their platform.
*/
if (statbuf->st_size > MAX_TAR_MEMBER_FILELEN)
ereport(ERROR,
(errmsg("archive member \"%s\" too large for tar format",
tarfilename)));

_tarWriteHeader(tarfilename, NULL, statbuf);

while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
{
/* Send the chunk as a CopyData message */
if (pq_putmessage(‘d‘, buf, cnt))
ereport(ERROR,
(errmsg("base backup could not send data, aborting backup")));

len += cnt;

if (len >= statbuf->st_size)
{
/*
* Reached end of file. The file could be longer, if it was
* extended while we were sending it, but for a base backup we can
* ignore such extended data. It will be restored from WAL.
*/
break;
}
}

/* If the file was truncated while we were sending it, pad it with zeros */
if (len < statbuf->st_size)
{
MemSet(buf, 0, sizeof(buf));
while (len < statbuf->st_size)
{
cnt = Min(sizeof(buf), statbuf->st_size - len);
pq_putmessage(‘d‘, buf, cnt);
len += cnt;
}
}

/* Pad to 512 byte boundary, per tar format requirements */
pad = ((len + 511) & ~511) - len;
if (pad > 0)
{
MemSet(buf, 0, pad);
pq_putmessage(‘d‘, buf, pad);
}

FreeFile(fp);

return true;
}


static void
_tarWriteHeader(const char *filename, const char *linktarget,
struct stat * statbuf)
{
char h[512];

/*
* Note: most of the fields in a tar header are not supposed to be
* null-terminated. We use sprintf, which will write a null after the
* required bytes; that null goes into the first byte of the next field.
* This is okay as long as we fill the fields in order.
*/
memset(h, 0, sizeof(h));

/* Name 100 */
sprintf(&h[0], "%.99s", filename);
if (linktarget != NULL || S_ISDIR(statbuf->st_mode))
{
/*
* We only support symbolic links to directories, and this is
* indicated in the tar format by adding a slash at the end of the
* name, the same as for regular directories.
*/
int flen = strlen(filename);

flen = Min(flen, 99);
h[flen] = ‘/‘;
h[flen + 1] = ‘\0‘;
}

/* Mode 8 */
sprintf(&h[100], "%07o ", (int) statbuf->st_mode);

/* User ID 8 */
sprintf(&h[108], "%07o ", statbuf->st_uid);

/* Group 8 */
sprintf(&h[116], "%07o ", statbuf->st_gid);

/* File size 12 - 11 digits, 1 space; use print_val for 64 bit support */
if (linktarget != NULL || S_ISDIR(statbuf->st_mode))
/* Symbolic link or directory has size zero */
print_val(&h[124], 0, 8, 11);
else
print_val(&h[124], statbuf->st_size, 8, 11);
sprintf(&h[135], " ");

/* Mod Time 12 */
sprintf(&h[136], "%011o ", (int) statbuf->st_mtime);

/* Checksum 8 cannot be calculated until we‘ve filled all other fields */

if (linktarget != NULL)
{
/* Type - Symbolic link */
sprintf(&h[156], "2");
/* Link Name 100 */
sprintf(&h[157], "%.99s", linktarget);
}
else if (S_ISDIR(statbuf->st_mode))
/* Type - directory */
sprintf(&h[156], "5");
else
/* Type - regular file */
sprintf(&h[156], "0");

/* Magic 6 */
sprintf(&h[257], "ustar");

/* Version 2 */
sprintf(&h[263], "00");

/* User 32 */
/* XXX: Do we need to care about setting correct username? */
sprintf(&h[265], "%.31s", "postgres");

/* Group 32 */
/* XXX: Do we need to care about setting correct group name? */
sprintf(&h[297], "%.31s", "postgres");

/* Major Dev 8 */
sprintf(&h[329], "%07o ", 0);

/* Minor Dev 8 */
sprintf(&h[337], "%07o ", 0);

/* Prefix 155 - not used, leave as nulls */

/*
* We mustn‘t overwrite the next field while inserting the checksum.
* Fortunately, the checksum can‘t exceed 6 octal digits, so we just write
* 6 digits, a space, and a null, which is legal per POSIX.
*/
sprintf(&h[148], "%06o ", _tarChecksum(h));

/* Now send the completed header. */
pq_putmessage(‘d‘, h, 512);
}

PostgreSQL中流复制pg_basebackup做了什么

标签:

原文地址:http://www.cnblogs.com/songyuejie/p/4719240.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!