Neo4J: CSV Import aus MySQL

bbcode-image


Oft werden NoSQL für sehr spezielle Fälle eingesetzt. Die normale Datenhaltung bleibt weiter hin den SQL-Datenbanken überlassen. Also müssen regelmäßig die Daten aus dem SQL-Bestand in die NoSQL Datenbank kopiert werden. Das dauert oft und viele aufbereitungen der Daten wird schon hier erledigt. die NoSQL Varianten sind deswegen auch oft schneller, weil man eine Teil der Arbeit in den Import-Jobs erledigt, die sonst bei jedem Query als Overhead entstehen. Natürlich haben die NoSQL auch ohne das ihre Vorteile, aber man sollte immer im Auge behalten, ob die Performance von der Engine kommt oder auch von der Optimierung der Daten, weil die Optimierungen der Daten könnte man auch in die SQL-Struktur zurück fließen lassen und diese in die Richtung hin verbessern.

So ein Import dauert... wenn man in der Nacht ein Zeitfenster von einer Stunde hat, ist alles kein Problem. Will man aber auch in kurzen Abständen importieren, muss der Import schnell laufen. Auch wenn man als Entwickler öfters mal den Import braucht, ist es wichtig möglichst viel Performance zu haben.
Hier geht es darum wie man möglichst schnell und einfach Daten aus einer MySQL Datenbank in eine Neo4j Graphen-Datenbank importieren kann, ohne viel Overhead zu erzeugen. Ich verwende hier PHP, aber da an sich keine Logik in PHP implementiert werden wird, kann man ganz leicht auf jeden andere Sprache, wie Java, JavaScript mit node.js und so übertragen. Es werden keine ORMs verwendet (die extrem viel Overhead erzeugen und viel Performance kosten) sondern nur SQL und Cypher.

Wie man einfach sich eine oder mehrere Neo4J-Instanzen anlegt (unter Linux) kann man hier sehr gut sehen:


Wir verwenden bei Neo4j den Import über eine CSV-Datei. Wir werden also nicht jeden Datensatz einzeln Lesen und Schreiben, sondern immer sehr viele auf einmal. Ob man alles in einer Transaktion laufen lässt und erst am Ende commited hängt etwas von der Datenmenge ab. Bis 200.000 Nodes und Relations ist alles kein Problem.. bei Millionen von Datensätzen sollte man aber nochmal drüber nachdenken.
PERIODIC COMMIT ist da eine super Lösung, um alles automatisch laufen zu lassen und sich nicht selbst darum kümmern zu müssen, wann commited wird. Alle 1000 bis 10_000 Datensätze ein Commit sollte gut sein, wobei ich eher zu 10_000 raten würde, weil 1000 doch noch sehr viele Commits sind und so mit der Overhead noch relativ groß ist.

Unsere Beispiel Datenbank sieht so aus:


CREATE TABLE USERS(
USER_ID INT(11) UNSGINED NOT NULL,
USER_NAME VARCHAR(255) NOT NULL,
PRIMARY KEY (USER_ID)
);

CREATE TABLE MESSAGES(
MESSAGE_ID INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
MESSAGE_TITLE VARCHAR(255) NOT NULL,
FROM_ID INT(11) UNSIGNED NOT NULL,
TO_ID INT(11) UNSIGNED NOT NULL,
CC_ID INT(11) UNSIGNED NOT NULL,
PRIMARY KEY (MESSAGE_ID)
);


Wir legen uns 50.000 User an dann noch 100.000 Messages mit jeweils einen FROM, einem TO und einem CC (hier hätte man über eine Link-Table sollen, aber das hier ist nur ein kleines Beispiel, wo das so reicht). Das sollten erst einmal genug Daten sein. (Offtopic: da ich das gerade neben bei auch in PHP schreibe.. warum kann ich für eine 100000 nicht wie in Java 100_000 schreiben?)

Die erste Schwierigkeit ist es die Daten schnell zu exportieren. Ziel ist eine CSV. Wir könnten entweder über PHP die Daten lesen und in eine Datei schreiben oder aber einfach die OUTFILE-Funktion von MySQL nutzen, um die Datenbank diese Arbeit erledigen zu lassen. Wir werden es so machen und erstellen für jede Art von Nodes und Relations eine eigene CSV. Weil wir Header haben wollen fügen wir diese mit UNION einmal oben hinzu


$sql="
SELECT 'user_id', 'user_name'
UNION
SELECT USER_ID,USERNAME
FROM USERS
INTO OUTFILE ".$exchangeFolder."/users.csv
FIELDS TERMINATED BY ','
ENCLOSED BY ''
LINES TERMINATED BY '\n'
";


Damit schreibt MySQL das Ergebnis des Queries in die angegebene Datei. Falls ein Fehler auftritt, muss man gucken, ob der Benutzer unter dem die MySQL-DB läuft in das Verzeichnis schreiben darf und ob nicht eine Anwendung wie apparmor unter Linux nicht den Zugriff blockiert. Es darf keine Datei mit diesen Namen schon vorhanden sein, sonst liefert MySQL auch nur einen Fehler zurück. Wir müssen
die Dateien also vorher löschen und dass machen wir einfach über PHP. Also muss auch der Benutzer unter dem die PHP-Anwendung läuft entsprechende Rechte haben.
Man kann das gut einmal direkt mit phpmyadmin oder einem entsprechenden Programm wie der MySQL Workbench testen. Wenn die Datei erzeugt und befüllt wird ist alles richtig eingestellt.

Mit dem Erstellen der CSV-Datei ist schon mal die Hälfte geschafft. Damit der Import auch schnell geht brauchen wir einen Index für unsere Nodes. Man kann einen Index schon anlegen, wenn noch gar kein Node des Types erstellt wurde. Zum Importieren der User benutzen wir folgendes Cypher-Statement:


$cyp="USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///".$exchangeFolder."/messages.csv" AS row
MERGE (m:message{mid:row.msg_id,title:row.msg_title});";


Der Pfad zur Datei wird als File-URL angegeben. Hier merkt man auch Neo4J seine Java-Basis an. Wenn man mal in eine Temp-Verzeichnis schaut sieht man dort auch Spuren von Jetty.

Am Ende wird der Importer nur eine Reihe von SQL und Cypher Statements ausführen. Wir benötigen um komfortabel zu arbeiten 3 Hilfsmethoden. Dass alles in richtige Klassen zu verpacken wäre natürlich besser, aber es reicht zum erklären erst einmal ein Funktionsbasierter Ansatz.

Da MySQL keine Dateien überschreiben will, brauchen wir eine Funktion zum Aufräumen des Verzeichnisses über das die CSV-Dateien ausgetauscht werden. Wir räumen einmal davor und einmal danach auf. Dann ist es kein Problem den Importer beim Testen mal mittendrin zu stoppen oder wenn er mal doch mit einem Fehler abbricht.


function cleanFolder($folder){
$files=scandir($folder);
foreach($files as $file){
if(preg_match("/\.csv$/i", $file)){
unlink($folder."/".$file);
}
}
}


Für SQL wird wieder mein PDBC verwendet.


include_once("PDBC/PDBCDBFactory.php");
PDBCDBFactory::init("PDBC/dbclasses/","PDBC/conffiles/");
$db=PDBCCache::getInstance()->getDB("embdoop");


Für Neo4J bauen wir uns eine eigen kleine Funktion.


use Everyman\Neo4j\Client;
use Everyman\Neo4j\Cypher\Query;

$client = new Everyman\Neo4j\Client();
$client->getTransport()->setAuth("neo4j","blubb");

function executeCypher($query){
global $client;
$query=new Query($client, $query);
$query->getResultSet();
}


Der Rest ist nun sehr einfach und linear. Ich glaube ich muss da nicht viel erklären und jeder Erkennt sehr schnell wie alles abläuft. Interessant ist wohl das Cypher-Statement für die Receive-Relations, da neben der Relation diese auch mit einem Attribute versehen wird im SET Bereich.


//clear for export (if a previous import failed)
cleanFolder($exchangeFolder);

//export nodes
echo "create users.csv\n";
$sql=" SELECT 'user_id', 'user_name' UNION
SELECT USER_ID,USER_NAME
FROM USERS
INTO OUTFILE '".$exchangeFolder."/users.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY ''
LINES TERMINATED BY '\n'";
$db->execute($sql);

echo "create messages.csv\n";
$sql=" SELECT 'msg_id', 'msg_title' UNION
SELECT MESSAGE_ID, MESSAGE_TITLE
FROM MESSAGES
INTO OUTFILE '".$exchangeFolder."/messages.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY ''
LINES TERMINATED BY '\n'";
$db->execute($sql);

//export relations
echo "create relations_etc.csv\n";
$sql=" SELECT 'user_id', 'msg_id', 'type' UNION
SELECT TO_ID, MESSAGE_ID, 'TO'
FROM MESSAGES
UNION
SELECT CC_ID, MESSAGE_ID, 'CC'
FROM MESSAGES

INTO OUTFILE '".$exchangeFolder."/relations_etc.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY ''
LINES TERMINATED BY '\n'";
$db->execute($sql);

echo "create relations_from.csv\n";
$sql=" SELECT 'user_id', 'msg_id', 'type' UNION
SELECT FROM_ID, MESSAGE_ID, 'FROM'
FROM MESSAGES

INTO OUTFILE '".$exchangeFolder."/relations_from.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY ''
LINES TERMINATED BY '\n'";
$db->execute($sql);

//create indexes for fast import
echo "create index's in neo4j\n";
$cyp="CREATE INDEX ON :user(uid);";
executeCypher($cyp);
$cyp="CREATE INDEX ON :message(mid);";
executeCypher($cyp);

//import nodes
echo "import users.csv\n";
$cyp="USING PERIODIC COMMIT 10000\n
LOAD CSV WITH HEADERS FROM "file:///".$exchangeFolder."/users.csv" AS row\n
MERGE (u:user{uid:row.user_id,name:row.user_name});";
executeCypher($cyp);

echo "import messages.csv\n";
$cyp="USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///".$exchangeFolder."/messages.csv" AS row
MERGE (m:message{mid:row.msg_id,title:row.msg_title});";
executeCypher($cyp);

//import relations
echo "import relations_from.csv\n";
$cyp="USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///".$exchangeFolder."/relations_from.csv" AS row
MATCH(u:user{uid:row.user_id})
MATCH(m:message{mid:row.msg_id})
MERGE (u)-[r:send]->(m);";
executeCypher($cyp);

echo "import relations_etc.csv\n";
$cyp="USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///".$exchangeFolder."/relations_etc.csv" AS row
MATCH(u:user{uid:row.user_id})
MATCH(m:message{mid:row.msg_id})
MERGE (m)-[r:receive]->(u)
SET r.type=row.type;";
executeCypher($cyp);

//clear after import
cleanFolder($exchangeFolder);


Hier sieht man wie der Importer die 50.000 User, 100.000 Messages und insgesamt 300.000 Relations von einer MySQL in die Neo4J Instanz importiert.



Die Festplatte ist nur über SATA-2 Angeschlossen und nicht besonders schnell. Eine SSD, wie für Neo4J empfohlen, würde alles sehr beschleunigen.

Zum Löschen aller Daten aus der Neo4J kann man diese Statement verwenden:


MATCH (n)
DETACH DELETE n
User annonyme 2016-03-16 20:12

write comment:
Seven + = 9