/*
 *	PROGRAM:	Object oriented API samples.
 *	MODULE:		11.batch.cpp
 *	DESCRIPTION:	A trivial sample of using Batch interface.
 *
 *					Example for the following interfaces:
 *					IBatch - interface to work with FB batches
 *					IBatchCompletionState - contains result of batch execution
 *
 *	c++ 11.batch.cpp -lfbclient
 *
 *  The contents of this file are subject to the Initial
 *  Developer's Public License Version 1.0 (the "License");
 *  you may not use this file except in compliance with the
 *  License. You may obtain a copy of the License at
 *  http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl.
 *
 *  Software distributed under the License is distributed AS IS,
 *  WITHOUT WARRANTY OF ANY KIND, either express or implied.
 *  See the License for the specific language governing rights
 *  and limitations under the License.
 *
 *  The Original Code was created by Alexander Peshkoff
 *  for the Firebird Open Source RDBMS project.
 *
 *  Copyright (c) 2017 Alexander Peshkoff <peshkoff@mail.ru>
 *  and all contributors signed below.
 *
 *  All Rights Reserved.
 *  Contributor(s): ______________________________________.
 */

#include "ifaceExamples.h"
#include <firebird/Message.h>

static IMaster* master = fb_get_master_interface();


// output error message to user

static void errPrint(IStatus* status)
{
	char buf[256];
	master->getUtilInterface()->formatStatus(buf, sizeof(buf), status);
	fprintf(stderr, "%s\n", buf);
}


// align target to alignment boundary

template <typename T>
static inline T align(T target, uintptr_t alignment)
{
	return (T) ((((uintptr_t) target) + alignment - 1) & ~(alignment - 1));
}


// append given message to buffer ptr

static void putMsg(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment)
{
	memcpy(ptr, from, size);
	ptr += align(size, alignment);
}


// append blob header with BPB to buffer ptr
// return pointer to blob size field - prefilled with BPB size

static unsigned* putBlobHdr(unsigned char*& ptr, unsigned alignment, ISC_QUAD* id, unsigned bpbSize, const unsigned char* bpb)
{
	ptr = align(ptr, alignment);

	memcpy(ptr, id, sizeof(ISC_QUAD));
	ptr += sizeof(ISC_QUAD);

	unsigned* rc = reinterpret_cast<unsigned*>(ptr);

	memcpy(ptr, &bpbSize, sizeof(unsigned));
	ptr += sizeof(unsigned);
	memcpy(ptr, &bpbSize, sizeof(unsigned));
	ptr += sizeof(unsigned);

	memcpy(ptr, bpb, bpbSize);
	ptr += bpbSize;

	return rc;
}


// append given blob to buffer ptr

static void putBlob(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment, ISC_QUAD* id)
{
	unsigned* sizePtr = putBlobHdr(ptr, alignment, id, 0, NULL);
	memcpy(ptr, from, size);
	*sizePtr += size;
	ptr += size;

	ptr = align(ptr, alignment);
}


// append given segment to buffer ptr

unsigned putSegment(unsigned char*& ptr, const char* testData)
{
	ptr = align(ptr, IBatch::BLOB_SEGHDR_ALIGN);
	unsigned short l = strlen(testData);
	memcpy(ptr, &l, sizeof l);
	ptr += sizeof l;
	memcpy(ptr, testData, l);
	ptr += l;
	return align(l + sizeof l, IBatch::BLOB_SEGHDR_ALIGN);
}

// batch info printer - prints what we know about batch

static void printInfo(ThrowStatusWrapper& status, const char* hdr, IBatch* b, IUtil* utl)
{
	printf("\n%s\n", hdr);

	const unsigned char items[] = {IBatch::INF_BLOB_ALIGNMENT, IBatch::INF_BUFFER_BYTES_SIZE,
								   IBatch::INF_DATA_BYTES_SIZE, IBatch::INF_BLOBS_BYTES_SIZE};
	unsigned char buffer[29];
	b->getInfo(&status, sizeof items, items, sizeof buffer, buffer);

	IXpbBuilder* pb = utl->getXpbBuilder(&status, IXpbBuilder::INFO_RESPONSE, buffer, sizeof buffer);
	for (pb->rewind(&status); !pb->isEof(&status); pb->moveNext(&status))
	{
		int val = pb->getInt(&status);
		const char* text = "Unknown tag";
		switch (pb->getTag(&status))
		{
		case IBatch::INF_BLOB_ALIGNMENT:
			text = "Blob alignment";
			break;
		case IBatch::INF_BUFFER_BYTES_SIZE:
			text = "Buffer size";
			break;
		case IBatch::INF_DATA_BYTES_SIZE:
			text = "Messages size";
			break;
		case IBatch::INF_BLOBS_BYTES_SIZE:
			text = "Blobs size";
			break;
		case isc_info_truncated:
			printf("  truncated\n");
			// fall down...
		case isc_info_end:
			pb->dispose();
			return;
		default:
			printf("Unexpected item %d\n", pb->getTag(&status));
			pb->dispose();
			return;
		}

		printf("%s = %d\n", text, val);
	}
	pb->dispose();
}

// BatchCompletionState printer - prints all what we know about completed batch

static void print_cs(ThrowStatusWrapper& status, IBatchCompletionState* cs, IUtil* utl)
{
	unsigned p = 0;
	IStatus* s2 = NULL;
	bool pr1 = false, pr2 = false;

	// 1. Print per-message state info

	unsigned upcount = cs->getSize(&status);
	unsigned unk = 0, succ = 0;
	for (p = 0; p < upcount; ++p)
	{
		int s = cs->getState(&status, p);
		switch (s)
		{
		case IBatchCompletionState::EXECUTE_FAILED:
			if (!pr1)
			{
				printf("Message Status\n");
				pr1 = true;
			}
			printf("%5u   Execute failed\n", p);
			break;

		case IBatchCompletionState::SUCCESS_NO_INFO:
			++unk;
			break;

		default:
			if (!pr1)
			{
				printf("Message Status\n");
				pr1 = true;
			}
			printf("%5u   Updated %d record(s)\n", p, s);
			++succ;
			break;
		}
	}
	printf("Summary: total=%u success=%u success(but no update info)=%u\n", upcount, succ, unk);

	// 2. Print detailed errors (if exist) for messages

	s2 = master->getStatus();
	for(p = 0; (p = cs->findError(&status, p)) != IBatchCompletionState::NO_MORE_ERRORS; ++p)
	{
		try
		{
			cs->getStatus(&status, s2, p);

			char text[1024];
			utl->formatStatus(text, sizeof(text) - 1, s2);
			text[sizeof(text) - 1] = 0;
			if (!pr2)
			{
				printf("\nDetailed errors status:\n");
				pr2 = true;
			}
			printf("Message %u: %s\n", p, text);
		}
		catch (const FbException& error)
		{
			// handle error
			fprintf(stderr, "\nError describing message %u\n", p);
			errPrint(error.getStatus());
			fprintf(stderr, "\n");
		}
	}

	if (s2)
		s2->dispose();
}

int main()
{
	int rc = 0;

	// set default password if none specified in environment
	setenv("ISC_USER", "sysdba", 0);
	setenv("ISC_PASSWORD", "masterkey", 0);

	// With ThrowStatusWrapper passed as status interface FbException will be thrown on error
	ThrowStatusWrapper status(master->getStatus());

	// Declare pointers to required interfaces
	IProvider* prov = master->getDispatcher();
	IUtil* utl = master->getUtilInterface();
	IAttachment* att = NULL;
	ITransaction* tra = NULL;
	IBatch* batch = NULL;
	IBatchCompletionState* cs = NULL;
	IXpbBuilder* pb = NULL;

	unsigned char streamBuf[10240];		// big enough for demo
	unsigned char* stream = NULL;

	try
	{
		// attach employee db
		att = prov->attachDatabase(&status, "employee", 0, NULL);
		tra = att->startTransaction(&status, 0, NULL);

		// cleanup
		att->execute(&status, tra, 0, "delete from project where proj_id like 'BAT%'", SAMPLES_DIALECT,
			NULL, NULL, NULL, NULL);

		//
		printf("\nPart 1. Simple messages. Adding one by one or by groups of messages, cancel batch.\n");
		//

		// Message to store in a table
		FB_MESSAGE(Msg1, ThrowStatusWrapper,
			(FB_VARCHAR(5), id)
			(FB_VARCHAR(10), name)
		) project1(&status, master);
		project1.clear();
		IMessageMetadata* meta = project1.getMetadata();

		// sizes & alignments
		unsigned mesAlign = meta->getAlignment(&status);
		unsigned mesLength = meta->getMessageLength(&status);
		unsigned char* streamStart = align(streamBuf, mesAlign);

		// set batch parameters
		pb = utl->getXpbBuilder(&status, IXpbBuilder::BATCH, NULL, 0);
		// collect per-message statistics
		pb->insertInt(&status, IBatch::TAG_RECORD_COUNTS, 1);

		// create batch
		const char* sqlStmt1 = "insert into project(proj_id, proj_name) values(?, ?)";
		batch = att->createBatch(&status, tra, 0, sqlStmt1, SAMPLES_DIALECT, meta,
			pb->getBufferLength(&status), pb->getBuffer(&status));

		// fill batch with data record by record
		project1->id.set("BAT11");
		project1->name.set("SNGL_REC1");
		batch->add(&status, 1, project1.getData());

		project1->id.set("BAT12");
		project1->name.set("SNGL_REC2");
		batch->add(&status, 1, project1.getData());

		// execute it
		cs = batch->execute(&status, tra);
		print_cs(status, cs, utl);

		// add a big set of same records ...
		for (int i = 0; i < 100000; ++i)
		{
			project1->id.set("BAT11");
			project1->name.set("SNGL_REC");
			batch->add(&status, 1, project1.getData());
		}

		// check batch state
		printInfo(status, "Info when added many records", batch, utl);

		// ... and cancel that records
		batch->cancel(&status);

		// fill batch with data using many records at once
		stream = streamStart;

		project1->id.set("BAT13");
		project1->name.set("STRM_REC_A");
		putMsg(stream, project1.getData(), mesLength, mesAlign);

		project1->id.set("BAT14");
		project1->name.set("STRM_REC_B");
		putMsg(stream, project1.getData(), mesLength, mesAlign);

		project1->id.set("BAT15");
		project1->name.set("STRM_REC_C");
		putMsg(stream, project1.getData(), mesLength, mesAlign);

		batch->add(&status, 3, streamStart);

		stream = streamStart;

		project1->id.set("BAT15");		// constraint violation
		project1->name.set("STRM_REC_D");
		putMsg(stream, project1.getData(), mesLength, mesAlign);

		project1->id.set("BAT16");		// will not be processed due to return on single error
		project1->name.set("STRM_REC_E");
		putMsg(stream, project1.getData(), mesLength, mesAlign);

		batch->add(&status, 2, streamStart);

		// execute it
		cs = batch->execute(&status, tra);
		print_cs(status, cs, utl);

		// close batch
		batch->close(&status);
		batch = NULL;

		//
		printf("\nPart 2. Simple BLOBs. Multiple errors return.\n");
		//

		// Message to store in a table
		FB_MESSAGE(Msg2, ThrowStatusWrapper,
			(FB_VARCHAR(5), id)
			(FB_VARCHAR(10), name)
			(FB_BLOB, desc)
		) project2(&status, master);
		project2.clear();
		meta = project2.getMetadata();

		mesAlign = meta->getAlignment(&status);
		mesLength = meta->getMessageLength(&status);
		streamStart = align(streamBuf, mesAlign);

		// set batch parameters
		pb->clear(&status);
		// continue batch processing in case of errors in some messages
		pb->insertInt(&status, IBatch::TAG_MULTIERROR, 1);
		// enable blobs processing - IDs generated by firebird engine
		pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_ID_ENGINE);

		// create batch
		const char* sqlStmt2 = "insert into project(proj_id, proj_name, proj_desc) values(?, ?, ?)";
		batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
			pb->getBufferLength(&status), pb->getBuffer(&status));

		// fill batch with data
		project2->id.set("BAT21");
		project2->name.set("SNGL_BLOB");
		batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
		batch->appendBlobData(&status, 1, "\n");
		batch->appendBlobData(&status, strlen(sqlStmt1), sqlStmt1);
		batch->add(&status, 1, project2.getData());

		printInfo(status, "Info with blob", batch, utl);

		// execute it
		cs = batch->execute(&status, tra);
		print_cs(status, cs, utl);

		// fill batch with data
		project2->id.set("BAT22");
		project2->name.set("SNGL_REC1");
		batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
		batch->add(&status, 1, project2.getData());

		project2->id.set("BAT22");
		project2->name.set("SNGL_REC2");	// constraint violation
		batch->addBlob(&status, 2, "r2", &project2->desc, 0, NULL);
		batch->add(&status, 1, project2.getData());

		project2->id.set("BAT23");
		project2->name.set("SNGL_REC3");
		batch->addBlob(&status, 2, "r3", &project2->desc, 0, NULL);
		batch->add(&status, 1, project2.getData());

		project2->id.set("BAT23");			// constraint violation
		project2->name.set("SNGL_REC4");
		batch->addBlob(&status, 2, "r4", &project2->desc, 0, NULL);
		batch->add(&status, 1, project2.getData());

		// execute it
		cs = batch->execute(&status, tra);
		print_cs(status, cs, utl);

		// close batch
		batch->close(&status);
		batch = NULL;

		//
		printf("\nPart 3. BLOB stream, including segmented BLOB.\n");
		//

		// use Msg2/project2/sqlStmt2 to store in a table

		// set batch parameters
		pb->clear(&status);
		// enable blobs processing - blobs are placed in a stream
		pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_STREAM);

		// create batch
		batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
			pb->getBufferLength(&status), pb->getBuffer(&status));

		unsigned blobAlign = batch->getBlobAlignment(&status);

		// prepare blob IDs
		ISC_QUAD v1={0,1}, v2={0,2}, v3={0,3};

		// send messages to batch
		project2->id.set("BAT31");
		project2->name.set("STRM_BLB_A");
		project2->desc = v1;
		batch->add(&status, 1, project2.getData());

		project2->id.set("BAT32");
		project2->name.set("STRM_BLB_B");
		project2->desc = v2;
		batch->add(&status, 1, project2.getData());

		project2->id.set("BAT33");
		project2->name.set("STRM_BLB_C");
		project2->desc = v3;
		batch->add(&status, 1, project2.getData());

		// prepare blobs in the stream buffer

		const char* d1 = "1111111111111111111";
		const char* d2 = "22222222222222222222";
		const char* d3 = "33333333333333333333333333333333333333333333333333333";

		stream = streamStart;
		putBlob(stream, d1, strlen(d1), blobAlign, &v1);
		putBlob(stream, d2, strlen(d2), blobAlign, &v2);
		putBlob(stream, d3, strlen(d3), blobAlign, &v3);

		batch->addBlobStream(&status, stream - streamStart, streamStart);

		// Continue last blob
		stream = streamStart;
		ISC_QUAD nullId = {0,0};
		unsigned* size = putBlobHdr(stream, blobAlign, &nullId, 0, NULL);

		const char* d4 = " 444444444444444444444444";
		unsigned ld4 = strlen(d4);

		memcpy(stream, d4, ld4);
		*size += ld4;
		stream += ld4;
		stream = align(stream, blobAlign);

		stream = align(stream, blobAlign);
		batch->addBlobStream(&status, stream - streamStart, streamStart);

		// Put segmented Blob in the stream

		// add message
		ISC_QUAD vSeg={0,10};
		project2->id.set("BAT35");
		project2->name.set("STRM_B_SEG");
		project2->desc = vSeg;
		batch->add(&status, 1, project2.getData());

		// build BPB
		pb->dispose();
		pb = NULL;
		pb = utl->getXpbBuilder(&status, IXpbBuilder::BPB, NULL, 0);
		pb->insertInt(&status, isc_bpb_type, isc_bpb_type_segmented);

		// make stream
		stream = streamStart;
		size = putBlobHdr(stream, blobAlign, &vSeg, pb->getBufferLength(&status), pb->getBuffer(&status));
		*size += putSegment(stream, d1);
		*size += putSegment(stream, "\n");
		*size += putSegment(stream, d2);
		*size += putSegment(stream, "\n");
		*size += putSegment(stream, d3);

		// add stream to the batch
		stream = align(stream, blobAlign);
		batch->addBlobStream(&status, stream - streamStart, streamStart);

		// execute batch
		cs = batch->execute(&status, tra);
		print_cs(status, cs, utl);

		//
		printf("\nPart 4. BLOB created using IBlob interface.\n");
		//

		// use Msg2/project2/sqlStmt2 to store in a table
		// registerBlob() may be called in BLOB_STREAM batch, ID should be generated by user in this case
		// also demonstrates execution of same batch multiple times

		// create blob
		ISC_QUAD realId;
		IBlob* blob = att->createBlob(&status, tra, &realId, 0, NULL);
		const char* text = "Blob created using traditional API";
		blob->putSegment(&status, strlen(text), text);
		blob->close(&status);

		// add message
		project2->id.set("BAT38");
		project2->name.set("FRGN_BLB");
		project2->desc = v1;	// after execute may reuse IDs
		batch->registerBlob(&status, &realId, &project2->desc);
		batch->add(&status, 1, project2.getData());

		// execute it
		cs = batch->execute(&status, tra);
		print_cs(status, cs, utl);

		// cleanup
		batch->close(&status);
		batch = NULL;
		tra->commit(&status);
		tra = NULL;
		att->detach(&status);
		att = NULL;
	}
	catch (const FbException& error)
	{
		// handle error
		rc = 1;
		errPrint(error.getStatus());
	}

	// release interfaces after error caught
	if (cs)
		cs->dispose();
	if (batch)
		batch->release();
	if (tra)
		tra->release();
	if (att)
		att->release();

	// cleanup
	if (pb)
		pb->dispose();
	status.dispose();
	prov->release();

	return rc;
}
