In [0]:
# Install Pytorch & other libraries
%pip install torch==2.6.0 tensorboard

# Install Hugging Face libraries
%pip install transformers==4.55.0 datasets==4.0.0 accelerate==1.10.0 evaluate trl==0.21.0 peft protobuf sentencepiece==0.2.0

# COMMENT IN: if you are running on a GPU that supports BF16 data type and flash attn, such as NVIDIA L4 or NVIDIA A100
%pip install flash-attn --no-build-isolation
%pip install mlflow tiktoken
dbutils.library.restartPython()

In [0]:
# https://aws.amazon.com/ec2/instance-types/

In [0]:
import torch 
import transformers
import accelerate
import trl
import sentencepiece
import datasets 

print("Torch version: ", torch.__version__)
print("Transformers version: ", transformers.__version__)
print("Accelerate version: ", accelerate.__version__)
print("TRL version: ", trl.__version__)
print("Sentencepiece version: ", sentencepiece.__version__)
print("Datasets version: ", datasets.__version__)

In [0]:
from huggingface_hub import login
from datasets import load_dataset, Dataset, DatasetDict
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import pipeline
from random import randint
import re
import tiktoken
from pathlib import Path
import json
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from sklearn.metrics import classification_report
from trl import SFTConfig
import torch 
from trl import SFTTrainer


In [0]:

def count_tokens(text, model="gpt2"):
 """Count the number of tokens in a text"""
 encoding = tiktoken.encoding_for_model(model)
 tokens = len(encoding.encode(text))
 return tokens

#udf
count_tokens_udf = F.udf(count_tokens, returnType=IntegerType())

In [0]:
login()

In [0]:
# does not work with python version == 3.11.11. It works with Python version: 3.12.3
base_model = "google/gemma-3-270m-it"

device = 'cuda' if torch.cuda.is_available() else 'cpu'
# lets try flash_attention_2
model = AutoModelForCausalLM.from_pretrained(base_model,torch_dtype=torch.bfloat16
 , device_map=device, attn_implementation='flash_attention_2')

# training speedup
model.config.use_cache = False

tokenizer = AutoTokenizer.from_pretrained(base_model)


In [0]:
# #w which side is the pad token is
# tokenizer.pad_token_id

In [0]:
print(f"Device: {model.device}")
print(f"DType: {model.dtype}")

In [0]:
# system prompt
system_prompt = """You are a policy classifier for Vrbo listings, reviews, and host-traveler exchanges. Decide if a listing violates Vrbo’s Shared Space Policy. Vrbo does not allow rentals where guests share internal living areas (bedroom, bathroom, kitchen, living room, interior hallway) with the host or other unrelated guests.

Allowed: Shared external spaces (yard, driveway, patio, pool, etc.).

Requirements for a compliant listing
	•	Private, secure entrance (guest-controlled lock)
	•	Private bathroom
	•	No shared internal living areas with host/other guests

Classification Options
	•	yes = Explicit or strong implication of shared internal space.
	•	no = Private internal spaces, or only shared external spaces.
	•	clarification = Possible but unclear sharing, or contradictory info.

Output Format
yes | no | clarification"""

pipe = pipeline('text-generation', model=model, tokenizer=tokenizer)


example_prompt = """Welcome to the heart of the bay area. You'll find yourself conveniently located to downtown San Mateo and all the transportation. This is a shared walkway, 1 bed guest suite. The unit has 1 queen sized bed with ample pillows, , mini kitchen, fast wifi, 2 x 43\" TV, Netflix, coffee, fast wifi. Self-check in. Mini Kitchen includes: Refrigerator, cook-top, microwave, Keurig / Keurig pods, utensils, cookware. Bathroom includes, towels, blow dryer, iron / ironing board, hand soap, shampoo, conditioner, body wash. Living area has pull out queen sofa bed with mattress topper, blanket, pillows, and sheets."""

example = [{"role": "system", "content": system_prompt}
 , {"role": "user", "content": example_prompt}]

prompt = pipe.tokenizer.apply_chat_template(example, tokenize=False, add_generation_prompt=True)

output = pipe(example, max_new_tokens=500, disable_compile=True
 , truncation=True, return_full_text=False)

output

In [0]:
SELECTED_COLUMNS = ['vrbo_property_id', 'text', 'decision', 'reasoning', 'taxonomy', 'label_excerpt', 'data_source']
PATH="s3://apiary-analytics-927134741764-us-east-1-mxt-ml/hyemam/shared_spaces/discovery/model_training/v3/listing_description_labels/"
listing_description_labels = spark.read.parquet(PATH)\
 .withColumn('data_source', F.lit('listing_description')).select(*SELECTED_COLUMNS)

LAB_REVIEW_PATH ="s3://apiary-analytics-927134741764-us-east-1-mxt-ml/hyemam/shared_spaces/discovery/model_training/v3/reviews_labelled/"
reviews_labelled = spark.read.parquet(LAB_REVIEW_PATH)\
 .withColumn('data_source', F.lit('post_stay_reviews')).select(*SELECTED_COLUMNS)

MSG_INQUIRY_PATH="s3://apiary-analytics-927134741764-us-east-1-mxt-ml/hyemam/shared_spaces/discovery/model_training/v3/message_inquiries_labelled/"
message_inquiries_labelled = spark.read.parquet(MSG_INQUIRY_PATH)\
 .withColumn('data_source', F.lit('msg_inquiry')).select(*SELECTED_COLUMNS)


 # .filter("decision != 'clarification'")\

data = listing_description_labels.union(reviews_labelled).union(message_inquiries_labelled)\
 .withColumn("num_token", count_tokens_udf(F.col("text")))\
 .filter("decision != 'unsure'")\
 .filter(~(F.col("num_token") > 512))\
 .withColumn("decision", F.lower(F.trim('decision')))\
 .withColumn("label", F.when(F.col("decision") == 'yes', 1).otherwise(0))

total_count = data.count()


data.groupBy('data_source').count()\
 .withColumn("total_count", F.lit(total_count))\
 .withColumn("percentage", F.round(F.col("count") * 100/ F.col("total_count"), 1)).display()

In [0]:
data.groupBy('decision', 'data_source').count().display()

In [0]:
# train and test split
train_dataset, test_dataset = data.randomSplit([0.8, 0.2], seed=42)

In [0]:
# training dataset
train_count = train_dataset.count()
print('Train label distribution:')
train_dataset.groupBy('label', 'decision').count()\
 .withColumn("total_count", F.lit(train_count))\
 .withColumn("percentage", F.round(F.col("count") * 100/ F.col("total_count"), 1))\
 .display()


# test dataset
test_count = test_dataset.count()
print('Test label distribution:', '\n')
test_dataset.groupBy('label', 'decision').count()\
 .withColumn("total_count", F.lit(test_count))\
 .withColumn("percentage", F.round(F.col("count") * 100/ F.col("total_count"), 1))\
 .display()

In [0]:
def prep_for_training(dataframe):
 dataframe['user'] = dataframe['text']
 # this is done because the model takes only strings
 dataframe['assistant'] = dataframe.apply(lambda x: x['decision'], axis=1)
 dataframe['system_prompt'] = system_prompt
 dataframe = dataframe[['user', 'assistant', 'system_prompt']]
 dataframe['token'] = dataframe.apply(lambda x: count_tokens(x['user']) + count_tokens(x['assistant']) + count_tokens(x['system_prompt']), axis=1)

 return dataframe

def create_conversation(sample):
 return {"messages": [{"role": "system", "content": sample['system_prompt']},
 {"role": "user", "content": sample['user']},
 {"role": "assistant", "content": sample['assistant']}]
 }



In [0]:
# prep training dataset
train_pandas = prep_for_training(train_dataset.toPandas())
test_pandas = prep_for_training(test_dataset.toPandas())


dataset_dict = DatasetDict({
 "train": Dataset.from_pandas(train_pandas),
 "test": Dataset.from_pandas(test_pandas)
})


# Convert dataset to conversational format
data_dataset = dataset_dict.map(create_conversation, batched=False
 ,remove_columns=dataset_dict['train'].column_names)
data_dataset

In [0]:
# example
data_dataset['train'][0]

In [0]:
from peft import LoraConfig


peft_config = LoraConfig(
 r=16,
 lora_alpha=32,
 lora_dropout=0.05,
 bias="none",
 target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"], # adjust for your model
 task_type="CAUSAL_LM"
)

In [0]:
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

In [0]:
max(train_pandas['token'].max(), test_pandas['token'].max())

In [0]:

# torch dtype
torch_dtype = model.dtype

# max sequence length
max_length = max(train_pandas['token'].max(), test_pandas['token'].max()) 

# checkpoint directory
checkpoint_dir = '/Workspace/Users/hyemam@expediagroup.com/Trust_and_Safety/Shared_Spaces/LLMExperiments/TestingNewOpenSourceModels/Gemma-270M-it'



args = SFTConfig(output_dir = checkpoint_dir # directory to save and repository id
 , max_length = max_length # max sequence length for model and packing of the dataset
 , packing = False # groups multiple samples in the dataset into a single sequence
 , num_train_epochs = 3
 , per_device_train_batch_size=2 # batch size per device during training
 , gradient_checkpointing=False # caching is incompatible with gradient checkpointing
 , optim='adamw_torch_fused'
 , logging_steps=1 # log every step
 , save_strategy='epoch' # save checkpint every epoch
 , eval_strategy='epoch' # evaluate checkpoint every epoch
 , learning_rate=5e-5
 , fp16=False
 , bf16=True 
 , lr_scheduler_type = 'cosine' # use constant learning rate scheduler
 , push_to_hub=True
 , report_to='mlflow'
 , dataset_kwargs = {'add_special_tokens': False # template with special tokens
 , 'append_concat_token': True # add EOS token as separator token between examples
 }
 )

In [0]:


# create trainer object
trainer = SFTTrainer(model=model, args=args
 , train_dataset=data_dataset['train']
 , eval_dataset=data_dataset['test']
 , processing_class=tokenizer
 , peft_config=peft_config)

In [0]:

# start training, the model will be automatically saved to the hub and the output directory
trainer.train()

In [0]:

model_id = 'henokyemam/gemma-3-270m-it-sft-ssp-august21'

trainer.push_to_hub(model_id, token=True)
# tokenizer.push_to_hub(model_id, token=True)