ndbapi_event.cpp

/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <NdbApi.hpp>

// Used for cout
#include <stdio.h>
#include <iostream>
#include <unistd.h>


#define APIERROR(error) \
  { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
              << error.code << ", msg: " << error.message << "." << std::endl; \
    exit(-1); }

int myCreateEvent(Ndb* myNdb,
      const char *eventName,
      const char *eventTableName,
      const char **eventColumnName,
      const int noEventColumnName);

int main()
{
  ndb_init();

  Ndb_cluster_connection *cluster_connection=
    new Ndb_cluster_connection(); // Object representing the cluster

  int r= cluster_connection->connect(5 /* retries               */,
             3 /* delay between retries */,
             1 /* verbose               */);
  if (r > 0)
  {
    std::cout
      << "Cluster connect failed, possibly resolved with more retries.\n";
    exit(-1);
  }
  else if (r < 0)
  {
    std::cout
      << "Cluster connect failed.\n";
    exit(-1);
  }
             
  if (cluster_connection->wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 secs." << std::endl;
    exit(-1);
  }

  Ndb* myNdb= new Ndb(cluster_connection,
          "TEST_DB");  // Object representing the database

  if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());

  const char *eventName= "CHNG_IN_TAB0";
  const char *eventTableName= "TAB0";
  const int noEventColumnName= 3;
  const char *eventColumnName[noEventColumnName]=
    {"COL0",
     "COL1",
     "COL11"};
  
  // Create events
  myCreateEvent(myNdb,
    eventName,
    eventTableName,
    eventColumnName,
    noEventColumnName);

  int j= 0;
  while (j < 5) {

    // Start "transaction" for handling events
    NdbEventOperation* op;
    printf("create EventOperation\n");
    if ((op = myNdb->createEventOperation(eventName,100)) == NULL)
      APIERROR(myNdb->getNdbError());

    printf("get values\n");
    NdbRecAttr* recAttr[noEventColumnName];
    NdbRecAttr* recAttrPre[noEventColumnName];
    // primary keys should always be a part of the result
    for (int i = 0; i < noEventColumnName; i++) {
      recAttr[i]    = op->getValue(eventColumnName[i]);
      recAttrPre[i] = op->getPreValue(eventColumnName[i]);
    }

    // set up the callbacks
    printf("execute\n");
    // This starts changes to "start flowing"
    if (op->execute())
      APIERROR(op->getNdbError());

    int i= 0;
    while(i < 40) {
      // printf("now waiting for event...\n");
      int r= myNdb->pollEvents(1000); // wait for event or 1000 ms
      if (r > 0) {
  // printf("got data! %d\n", r);
  int overrun;
  while (op->next(&overrun) > 0) {
    i++;
    if (!op->isConsistent())
      printf("A node failure has occured and events might be missing\n");
    switch (op->getEventType()) {
    case NdbDictionary::Event::TE_INSERT:
      printf("%u INSERT: ", i);
      break;
    case NdbDictionary::Event::TE_DELETE:
      printf("%u DELETE: ", i);
      break;
    case NdbDictionary::Event::TE_UPDATE:
      printf("%u UPDATE: ", i);
      break;
    default:
      abort(); // should not happen
    }
    printf("overrun %u pk %u: ", overrun, recAttr[0]->u_32_value());
    for (int i = 1; i < noEventColumnName; i++) {
      if (recAttr[i]->isNULL() >= 0) { // we have a value
        printf(" post[%u]=", i);
        if (recAttr[i]->isNULL() == 0) // we have a non-null value
    printf("%u", recAttr[i]->u_32_value());
        else                           // we have a null value
    printf("NULL");
      }
      if (recAttrPre[i]->isNULL() >= 0) { // we have a value
        printf(" pre[%u]=", i);
        if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
    printf("%u", recAttrPre[i]->u_32_value());
        else                              // we have a null value
    printf("NULL");
      }
    }
    printf("\n");
  }
      } else
  ;//printf("timed out\n");
    }
    // don't want to listen to events anymore
    if (myNdb->dropEventOperation(op)) APIERROR(myNdb->getNdbError());

    j++;
  }

  {
    NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
    if (!myDict) APIERROR(myNdb->getNdbError());
    // remove event from database
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
  }

  delete myNdb;
  delete cluster_connection;
  ndb_end(0);
  return 0;
}

int myCreateEvent(Ndb* myNdb,
      const char *eventName,
      const char *eventTableName,
      const char **eventColumnNames,
      const int noEventColumnNames)
{
  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
  if (!myDict) APIERROR(myNdb->getNdbError());

  const NdbDictionary::Table *table= myDict->getTable(eventTableName);
  if (!table) APIERROR(myDict->getNdbError());

  NdbDictionary::Event myEvent(eventName, *table);
  myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); 
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); 
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE); 
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);

  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);

  // Add event to database
  if (myDict->createEvent(myEvent) == 0)
    myEvent.print();
  else if (myDict->getNdbError().classification ==
     NdbError::SchemaObjectExists) {
    printf("Event creation failed, event exists\n");
    printf("dropping Event...\n");
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
    // try again
    // Add event to database
    if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
  } else
    APIERROR(myDict->getNdbError());

  return 0;
}

Documentation generated Fri Dec 7 14:54:14 2007 from mysql source files.
© 2003-2004 MySQL AB